// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.


#include <krpc/utility/compat.h>                        // OS_MACOSX
#include <krpc/utility/ssl_compat.h>                    // BIO_fd_non_fatal_error
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <netinet/tcp.h>                         // getsockopt
#include <krpc/kthread/unstable.h>                    // kthread_timer_del
#include <krpc/utility/fd_utility.h>                     // make_non_blocking
#include <krpc/utility/fd_guard.h>                       // fd_guard
#include <krpc/utility/time.h>                           // cpuwide_time_us
#include <krpc/utility/object_pool.h>                    // get_object
#include <krpc/utility/logging.h>                        // CHECK
#include <krpc/utility/macros.h>
#include <krpc/utility/class_name.h>                     // kutil::class_name
#include <krpc/utility/memory/scope_guard.h>
#include <krpc/rpc/log.h>
#include <krpc/rpc/errno.pb.h>
#include <krpc/rpc/event_dispatcher.h>          // RemoveConsumer
#include <krpc/rpc/socket.h>
#include <krpc/rpc/describable.h>               // Describable
#include <krpc/rpc/circuit_breaker.h>           // CircuitBreaker
#include <krpc/rpc/input_messenger.h>
#include <krpc/rpc/details/sparse_minute_counter.h>
#include <krpc/rpc/stream_impl.h>
#include <krpc/rpc/shared_object.h>
#include <krpc/rpc/policy/rtmp_protocol.h>  // FIXME
#include <krpc/rpc/periodic_task.h>
#include <krpc/rpc/details/health_check.h>
#include <krpc/rpc/rdma/rdma_endpoint.h>
#include <krpc/rpc/rdma/rdma_helper.h>
#include <krpc/rpc/config.h>

#if defined(OS_MACOSX)
#include <sys/event.h>
#endif

namespace kthread {
    size_t __attribute__((weak))
    get_sizes(const kthread_id_list_t *list, size_t *cnt, size_t n);
}


namespace krpc {

    const int WAIT_EPOLLOUT_TIMEOUT_MS = 50;

    class KRPC_CACHELINE_ALIGNMENT SocketPool {
        friend class Socket;

    public:
        explicit SocketPool(const SocketOptions &opt);

        ~SocketPool();

        // Get an address-able socket. If the pool is empty, create one.
        // Returns 0 on success.
        int GetSocket(SocketUniquePtr *ptr);

        // Return a socket (which was returned by GetSocket) back to the pool,
        // if the pool is full, setfail the socket directly.
        void ReturnSocket(Socket *sock);

        // Get all pooled sockets inside.
        void ListSockets(std::vector<SocketId> *list, size_t max_count);

    private:
        // options used to create this instance
        SocketOptions _options;
        kutil::Mutex _mutex;
        std::vector<SocketId> _pool;
        kutil::EndPoint _remote_side;
        kutil::atomic<int> _numfree; // #free sockets in all sub pools.
        kutil::atomic<int> _numinflight; // #inflight sockets in all sub pools.
    };

// NOTE: sizeof of this class is 1200 bytes. If we have 10K sockets, total
// memory is 12MB, not lightweight, but acceptable.
    struct ExtendedSocketStat : public SocketStat {
        // For computing stat.
        size_t last_in_size;
        size_t last_in_num_messages;
        size_t last_out_size;
        size_t last_out_num_messages;

        struct Sampled {
            uint32_t in_size_s;
            uint32_t in_num_messages_s;
            uint32_t out_size_s;
            uint32_t out_num_messages_s;
        };
        SparseMinuteCounter<Sampled> _minute_counter;

        ExtendedSocketStat()
                : last_in_size(0), last_in_num_messages(0), last_out_size(0), last_out_num_messages(0) {
            memset((SocketStat *) this, 0, sizeof(SocketStat));
        }
    };

// Shared by main socket and derivative sockets.
    class Socket::SharedPart : public SharedObject {
    public:
        // A pool of sockets on which only a request can be sent, corresponding
        // to CONNECTION_TYPE_POOLED. When RPC begins, it picks one socket from
        // this pool and send the request, when the RPC ends, it returns the
        // socket back to this pool.
        // Before rev <= r32136, the pool is managed globally in socket_map.cpp
        // which has the disadvantage that accesses to different pools contend
        // with each other.
        kutil::atomic<SocketPool *> socket_pool;

        // The socket newing this object.
        SocketId creator_socket_id;

        // Counting number of continuous ETIMEDOUT
        kutil::atomic<int> num_continuous_connect_timeouts;

        // _in_size, _in_num_messages, _out_size, _out_num_messages of pooled
        // sockets are counted into the corresponding fields in their _main_socket.
        kutil::atomic<size_t> in_size;
        kutil::atomic<size_t> in_num_messages;
        kutil::atomic<size_t> out_size;
        kutil::atomic<size_t> out_num_messages;

        // For computing stats.
        ExtendedSocketStat *extended_stat;

        CircuitBreaker circuit_breaker;

        kutil::atomic<uint64_t> recent_error_count;

        explicit SharedPart(SocketId creator_socket_id);

        ~SharedPart();

        // Call this method every second (roughly)
        void UpdateStatsEverySecond(int64_t now_ms);
    };

    Socket::SharedPart::SharedPart(SocketId creator_socket_id2)
            : socket_pool(NULL), creator_socket_id(creator_socket_id2), num_continuous_connect_timeouts(0), in_size(0),
              in_num_messages(0), out_size(0), out_num_messages(0), extended_stat(NULL), recent_error_count(0) {
    }

    Socket::SharedPart::~SharedPart() {
        delete extended_stat;
        extended_stat = NULL;
        delete socket_pool.exchange(NULL, kutil::memory_order_relaxed);
    }

    void Socket::SharedPart::UpdateStatsEverySecond(int64_t now_ms) {
        ExtendedSocketStat *stat = extended_stat;
        if (stat == NULL) {
            stat = new(std::nothrow) ExtendedSocketStat;
            if (stat == NULL) {
                return;
            }
            extended_stat = stat;
        }

        // Save volatile counters.
        const size_t in_sz = in_size.load(kutil::memory_order_relaxed);
        const size_t in_nmsg = in_num_messages.load(kutil::memory_order_relaxed);
        const size_t out_sz = out_size.load(kutil::memory_order_relaxed);
        const size_t out_nmsg = out_num_messages.load(kutil::memory_order_relaxed);

        // Notice that we don't normalize any data, mainly because normalization
        // often make data inaccurate and confuse users. This assumes that this
        // function is called exactly every second. This may not be true when the
        // running machine gets very busy. TODO(gejun): Figure out a method to
        // selectively normalize data when the calling interval is far from 1 second.
        stat->in_size_s = in_sz - stat->last_in_size;
        stat->in_num_messages_s = in_nmsg - stat->last_in_num_messages;
        stat->out_size_s = out_sz - stat->last_out_size;
        stat->out_num_messages_s = out_nmsg - stat->last_out_num_messages;

        stat->last_in_size = in_sz;
        stat->last_in_num_messages = in_nmsg;
        stat->last_out_size = out_sz;
        stat->last_out_num_messages = out_nmsg;

        ExtendedSocketStat::Sampled popped;
        if (stat->in_size_s |/*bitwise or*/
            stat->in_num_messages_s |
            stat->out_size_s |
            stat->out_num_messages_s) {
            ExtendedSocketStat::Sampled s = {
                    stat->in_size_s, stat->in_num_messages_s,
                    stat->out_size_s, stat->out_num_messages_s
            };
            stat->in_size_m += s.in_size_s;
            stat->in_num_messages_m += s.in_num_messages_s;
            stat->out_size_m += s.out_size_s;
            stat->out_num_messages_m += s.out_num_messages_s;
            if (stat->_minute_counter.Add(now_ms, s, &popped)) {
                stat->in_size_m -= popped.in_size_s;
                stat->in_num_messages_m -= popped.in_num_messages_s;
                stat->out_size_m -= popped.out_size_s;
                stat->out_num_messages_m -= popped.out_num_messages_s;
            }
        }
        while (stat->_minute_counter.TryPop(now_ms, &popped)) {
            stat->in_size_m -= popped.in_size_s;
            stat->in_num_messages_m -= popped.in_num_messages_s;
            stat->out_size_m -= popped.out_size_s;
            stat->out_num_messages_m -= popped.out_num_messages_s;
        }
    }

    SocketVarsCollector *g_vars = NULL;

    static pthread_once_t s_create_vars_once = PTHREAD_ONCE_INIT;

    static void CreateVars() {
        g_vars = new SocketVarsCollector;
    }

    void Socket::CreateVarsOnce() {
        CHECK_EQ(0, pthread_once(&s_create_vars_once, CreateVars));
    }

// Used by ConnectionService
    int64_t GetChannelConnectionCount() {
        if (g_vars) {
            return g_vars->channel_conn.get_value();
        }
        return 0;
    }

    bool Socket::CreatedByConnect() const {
        return _user == static_cast<SocketUser *>(get_client_side_messenger());
    }

    SocketMessage *const DUMMY_USER_MESSAGE = (SocketMessage *) 0x1;
    const uint32_t MAX_PIPELINED_COUNT = 16384;

    struct KRPC_CACHELINE_ALIGNMENT Socket::WriteRequest {
        static WriteRequest *const UNCONNECTED;

        kutil::IOBuf data;
        WriteRequest *next;
        kthread_id_t id_wait;

        void clear_and_set_control_bits(bool notify_on_success,
                                        bool shutdown_write) {
            _socket_and_control_bits.set_extra(
                    (uint16_t) notify_on_success << 1 | (uint16_t) shutdown_write);
        }

        void set_socket(Socket *s) {
            _socket_and_control_bits.set(s);
        }

        // If this field is set to true, notify when write successfully.
        bool is_notify_on_success() const {
            return _socket_and_control_bits.extra() & ((uint16_t) 1 << 1);
        }

        // Whether shutdown write of the socket after this write complete.
        bool need_shutdown_write() const {
            return _socket_and_control_bits.extra() & (uint16_t) 1;
        }

        Socket *get_socket() const {
            return _socket_and_control_bits.get();
        }

        uint32_t pipelined_count() const {
            return _pc_and_udmsg.extra() & 0x3FFF;
        }

        uint32_t get_auth_flags() const {
            return (_pc_and_udmsg.extra() >> 14) & 0x03;
        }

        void clear_pipelined_count_and_auth_flags() {
            _pc_and_udmsg.reset_extra();
        }

        SocketMessage *user_message() const {
            return _pc_and_udmsg.get();
        }

        void clear_user_message() {
            _pc_and_udmsg.reset();
        }

        void set_pipelined_count_and_user_message(
                uint32_t pc, SocketMessage *msg, uint32_t auth_flags) {
            if (auth_flags) {
                pc |= (auth_flags & 0x03) << 14;
            }
            _pc_and_udmsg.set_ptr_and_extra(msg, pc);
        }

        bool reset_pipelined_count_and_user_message() {
            SocketMessage *msg = user_message();
            if (msg) {
                if (msg != DUMMY_USER_MESSAGE) {
                    kutil::IOBuf dummy_buf;
                    // We don't care about the return value since the request
                    // is already failed.
                    (void) msg->AppendAndDestroySelf(&dummy_buf, NULL);
                }
                set_pipelined_count_and_user_message(0, NULL, 0);
                return true;
            }
            return false;
        }

        // Register pipelined_count and user_message
        void Setup(Socket *s);

    private:
        // Socket pointer and some control bits.
        PackedPtr<Socket> _socket_and_control_bits;
        // User message pointer, pipelined count auth flag.
        PackedPtr<SocketMessage> _pc_and_udmsg;
    };

    void Socket::WriteRequest::Setup(Socket *s) {
        SocketMessage *msg = user_message();
        if (msg) {
            clear_user_message();
            if (msg != DUMMY_USER_MESSAGE) {
                kutil::Status st = msg->AppendAndDestroySelf(&data, s);
                if (!st.ok()) {
                    // Abandon the request.
                    data.clear();
                    kthread_id_error2(id_wait, st.error_code(), st.error_cstr());
                    return;
                }
            }
            const int64_t before_write =
                    s->_unwritten_bytes.fetch_add(data.size(), kutil::memory_order_relaxed);
            if (before_write + (int64_t) data.size() >= turbo::get_flag(FLAGS_socket_max_unwritten_bytes)) {
                s->_overcrowded = true;
            }
        }
        const uint32_t pc = pipelined_count();
        if (pc) {
            // For positional correspondence between responses and requests,
            // which is common in cache servers: memcache, redis...
            // The struct will be popped when reading a message from the socket.
            PipelinedInfo pi;
            pi.count = pc;
            pi.auth_flags = get_auth_flags();
            pi.id_wait = id_wait;
            clear_pipelined_count_and_auth_flags(); // avoid being pushed again
            s->PushPipelinedInfo(pi);
        }
    }

    Socket::WriteRequest *const Socket::WriteRequest::UNCONNECTED =
            (Socket::WriteRequest *) (intptr_t) -1;

    class Socket::EpollOutRequest : public SocketUser {
    public:
        EpollOutRequest() : fd(-1), timer_id(0), on_epollout_event(NULL), data(NULL) {}

        ~EpollOutRequest() override {
            // Remove the timer at last inside destructor to avoid
            // race with the place that registers the timer
            if (timer_id) {
                kthread_timer_del(timer_id);
                timer_id = 0;
            }
        }

        void BeforeRecycle(Socket *) override {
            // Recycle itself.
            delete this;
        }

        int fd;
        kthread_timer_t timer_id;

        int (*on_epollout_event)(int fd, int err, void *data);

        void *data;
    };

    static const uint64_t AUTH_FLAG = (1ul << 32);

    Socket::Socket(Forbidden f)
    // must be even because Address() relies on evenness of version
            : VersionedRefWithId<Socket>(f), _shared_part(NULL), _nevent(0), _keytable_pool(NULL), _fd(-1), _tos(0),
              _reset_fd_real_us(-1), _on_edge_triggered_events(NULL), _user(NULL), _conn(NULL), _preferred_index(-1),
              _hc_count(0), _last_msg_size(0), _avg_msg_size(0), _last_readtime_us(0), _parsing_context(NULL),
              _correlation_id(0), _health_check_interval_s(-1), _is_hc_related_ref_held(false), _hc_started(false),
              _ninprocess(1), _auth_flag_error(0), _auth_id(INVALID_KTHREAD_ID), _auth_context(NULL),
              _ssl_state(SSL_UNKNOWN), _ssl_session(NULL), _rdma_ep(NULL), _rdma_state(RDMA_OFF),
              _connection_type_for_progressive_read(CONNECTION_TYPE_UNKNOWN), _controller_released_socket(false),
              _overcrowded(false), _fail_me_at_server_stop(false), _logoff_flag(false), _error_code(0),
              _pipeline_q(NULL), _last_writetime_us(0), _unwritten_bytes(0), _epollout_butex(NULL), _write_head(NULL),
              _is_write_shutdown(false), _stream_set(NULL), _total_streams_unconsumed_size(0),
              _ninflight_app_health_check(0), _http_request_method(HTTP_METHOD_GET) {
        CreateVarsOnce();
        pthread_mutex_init(&_id_wait_list_mutex, NULL);
        _epollout_butex = kthread::butex_create_checked<kutil::atomic<int> >();
    }

    Socket::~Socket() {
        pthread_mutex_destroy(&_id_wait_list_mutex);
        kthread::butex_destroy(_epollout_butex);
    }

    void Socket::ReturnSuccessfulWriteRequest(Socket::WriteRequest *p) {
        DCHECK(p->data.empty());
        AddOutputMessages(1);
        const kthread_id_t id_wait = p->id_wait;
        kutil::return_object(p);
        if (id_wait != INVALID_KTHREAD_ID) {
            if (p->is_notify_on_success() && !Failed()) {
                kthread_id_error(id_wait, 0);
            } else {
                NotifyOnFailed(id_wait);
            }
        }
    }

    void Socket::ReturnFailedWriteRequest(Socket::WriteRequest *p, int error_code,
                                          const std::string &error_text) {
        if (!p->reset_pipelined_count_and_user_message()) {
            CancelUnwrittenBytes(p->data.size());
        }
        p->data.clear();  // data is probably not written.
        const kthread_id_t id_wait = p->id_wait;
        kutil::return_object(p);
        if (id_wait != INVALID_KTHREAD_ID) {
            kthread_id_error2(id_wait, error_code, error_text);
        }
    }

    Socket::WriteRequest *Socket::ReleaseWriteRequestsExceptLast(
            Socket::WriteRequest *req, int error_code, const std::string &error_text) {
        WriteRequest *p = req;
        while (p->next != NULL) {
            WriteRequest *const saved_next = p->next;
            ReturnFailedWriteRequest(p, error_code, error_text);
            p = saved_next;
        }
        return p;
    }

    void Socket::ReleaseAllFailedWriteRequests(Socket::WriteRequest *req) {
        CHECK(Failed() || IsWriteShutdown());
        int error_code;
        std::string error_text;
        if (Failed()) {
            pthread_mutex_lock(&_id_wait_list_mutex);
            error_code = non_zero_error_code();
            error_text = _error_text;
            pthread_mutex_unlock(&_id_wait_list_mutex);
        } else {
            error_code = ESHUTDOWNWRITE;
            error_text = "Shutdown write of the socket";
        }
        // Notice that `req' is not tail if Address after IsWriteComplete fails.
        do {
            req = ReleaseWriteRequestsExceptLast(req, error_code, error_text);
            if (!req->reset_pipelined_count_and_user_message()) {
                CancelUnwrittenBytes(req->data.size());
            }
            req->data.clear();  // MUST, otherwise IsWriteComplete is false
        } while (!IsWriteComplete(req, true, NULL));
        ReturnFailedWriteRequest(req, error_code, error_text);
    }

    int Socket::ResetFileDescriptor(int fd) {
        // Reset message sizes when fd is changed.
        _last_msg_size = 0;
        _avg_msg_size = 0;
        // MUST store `_fd' before adding itself into epoll device to avoid
        // race conditions with the callback function inside epolll
        _fd.store(fd, kutil::memory_order_release);
        _reset_fd_real_us = kutil::gettimeofday_us();
        if (!ValidFileDescriptor(fd)) {
            return 0;
        }
        if (_remote_side == kutil::EndPoint()) {
            // OK to fail, non-socket fd does not support this.
            kutil::get_remote_side(fd, &_remote_side);
        }
        // OK to fail, non-socket fd does not support this.
        kutil::get_local_side(fd, &_local_side);

        // FIXME : close-on-exec should be set by new syscalls or worse: set right
        // after fd-creation syscall. Setting at here has higher probabilities of
        // race condition.
        kutil::make_close_on_exec(fd);

        // Make the fd non-blocking.
        if (kutil::make_non_blocking(fd) != 0) {
            PLOG(ERROR) << "Fail to set fd=" << fd << " to non-blocking";
            _fd.store(-1, kutil::memory_order_release);
            return -1;
        }
        // turn off nagling.
        // OK to fail, namely unix domain socket does not support this.
        kutil::make_no_delay(fd);
        if (_tos > 0 &&
            setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) != 0) {
            PLOG(ERROR) << "Fail to set tos of fd=" << fd << " to " << _tos;
        }

        if (turbo::get_flag(FLAGS_socket_send_buffer_size) > 0) {
            int buff_size = turbo::get_flag(FLAGS_socket_send_buffer_size);
            if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buff_size, sizeof(buff_size)) != 0) {
                PLOG(ERROR) << "Fail to set sndbuf of fd=" << fd << " to "
                            << buff_size;
            }
        }

        if (turbo::get_flag(FLAGS_socket_recv_buffer_size) > 0) {
            int buff_size = turbo::get_flag(FLAGS_socket_recv_buffer_size);
            if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buff_size, sizeof(buff_size)) != 0) {
                PLOG(ERROR) << "Fail to set rcvbuf of fd=" << fd << " to "
                            << buff_size;
            }
        }

        EnableKeepaliveIfNeeded(fd);

        if (_on_edge_triggered_events) {
            if (_io_event.AddConsumer(fd) != 0) {
                PLOG(ERROR) << "Fail to add SocketId=" << id()
                            << " into EventDispatcher";
                _fd.store(-1, kutil::memory_order_release);
                return -1;
            }
        }
        return 0;
    }

    void Socket::EnableKeepaliveIfNeeded(int fd) {
        if (!_keepalive_options) {
            return;
        }

        int keepalive = 1;
        if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive,
                       sizeof(keepalive)) != 0) {
            PLOG(ERROR) << "Fail to set keepalive of fd=" << fd;
            return;
        }

#if defined(OS_LINUX)
        if (_keepalive_options->keepalive_idle_s > 0) {
            if (setsockopt(fd, SOL_TCP, TCP_KEEPIDLE,
                           &_keepalive_options->keepalive_idle_s,
                           sizeof(_keepalive_options->keepalive_idle_s)) != 0) {
                PLOG(ERROR) << "Fail to set keepidle of fd=" << fd;
            }
        }

        if (_keepalive_options->keepalive_interval_s > 0) {
            if (setsockopt(fd, SOL_TCP, TCP_KEEPINTVL,
                           &_keepalive_options->keepalive_interval_s,
                           sizeof(_keepalive_options->keepalive_interval_s)) != 0) {
                PLOG(ERROR) << "Fail to set keepintvl of fd=" << fd;
            }
        }

        if (_keepalive_options->keepalive_count > 0) {
            if (setsockopt(fd, SOL_TCP, TCP_KEEPCNT,
                           &_keepalive_options->keepalive_count,
                           sizeof(_keepalive_options->keepalive_count)) != 0) {
                PLOG(ERROR) << "Fail to set keepcnt of fd=" << fd;
            }
        }
#elif defined(OS_MACOSX)
        if (_keepalive_options->keepalive_idle_s > 0) {
            if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE,
                           &_keepalive_options->keepalive_idle_s,
                           sizeof(_keepalive_options->keepalive_idle_s)) != 0) {
                PLOG(ERROR) << "Fail to set keepidle of fd=" << fd;
            }
        }

        if (_keepalive_options->keepalive_interval_s > 0) {
            if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL,
                           &_keepalive_options->keepalive_interval_s,
                           sizeof(_keepalive_options->keepalive_interval_s)) != 0) {
                PLOG(ERROR) << "Fail to set keepintvl of fd=" << fd;
            }
        }

        if (_keepalive_options->keepalive_count > 0) {
            if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT,
                           &_keepalive_options->keepalive_count,
                           sizeof(_keepalive_options->keepalive_count)) != 0) {
                PLOG(ERROR) << "Fail to set keepcnt of fd=" << fd;
            }
        }
#endif
    }

// SocketId = 32-bit version + 32-bit slot.
//   version: from version part of _versioned_nref, must be an EVEN number.
//   slot: designated by ResourcePool.
    int Socket::Create(const SocketOptions &options, SocketId *id) {
        return VersionedRefWithId<Socket>::Create(id, options);
    }

    int Socket::OnCreated(const SocketOptions &options) {
        if (_io_event.Init((void *) id()) != 0) {
            LOG(ERROR) << "Fail to init IOEvent";
            SetFailed(ENOMEM, "%s", "Fail to init IOEvent");
            return -1;
        }
        _io_event.set_kthread_tag(options.kthread_tag);
        auto guard = kutil::MakeScopeGuard([this] {
            _io_event.Reset();
        });

        g_vars->nsocket << 1;
        CHECK(NULL == _shared_part.load(kutil::memory_order_relaxed));
        _nevent.store(0, kutil::memory_order_relaxed);
        _keytable_pool = options.keytable_pool;
        _tos = 0;
        _remote_side = options.remote_side;
        _local_side = kutil::EndPoint();
        _on_edge_triggered_events = options.on_edge_triggered_events;
        _user = options.user;
        _conn = options.conn;
        _app_connect = options.app_connect;
        _preferred_index = -1;
        _hc_count = 0;
        CHECK(_read_buf.empty());
        const int64_t cpuwide_now = kutil::cpuwide_time_us();
        _last_readtime_us.store(cpuwide_now, kutil::memory_order_relaxed);
        reset_parsing_context(options.initial_parsing_context);
        _correlation_id = 0;
        _health_check_interval_s = options.health_check_interval_s;
        _is_hc_related_ref_held = false;
        _hc_started.store(false, kutil::memory_order_relaxed);
        _ninprocess.store(1, kutil::memory_order_relaxed);
        _auth_flag_error.store(0, kutil::memory_order_relaxed);
        const int rc2 = kthread_id_create(&_auth_id, NULL, NULL);
        if (rc2) {
            LOG(ERROR) << "Fail to create auth_id: " << berror(rc2);
            SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2));
            return -1;
        }
        _force_ssl = options.force_ssl;
        // Disable SSL check if there is no SSL context
        _ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN);
        _ssl_session = NULL;
        _ssl_ctx = options.initial_ssl_ctx;
#if KRPC_WITH_RDMA
        CHECK(_rdma_ep == NULL);
        if (options.use_rdma) {
            _rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(m);
            if (!_rdma_ep) {
                const int saved_errno = errno;
                PLOG(ERROR) << "Fail to create RdmaEndpoint";
                SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
                             berror(saved_errno));
                return -1;
            }
            _rdma_state = RDMA_UNKNOWN;
        } else {
            _rdma_state = RDMA_OFF;
        }
#endif
        _connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN;
        _controller_released_socket.store(false, kutil::memory_order_relaxed);
        _overcrowded = false;
        // Maybe non-zero for RTMP connections.
        _fail_me_at_server_stop = false;
        _logoff_flag.store(false, kutil::memory_order_relaxed);
        _error_code = 0;
        _agent_socket_id.store(INVALID_SOCKET_ID, kutil::memory_order_relaxed);
        _total_streams_unconsumed_size.store(0, kutil::memory_order_relaxed);
        _ninflight_app_health_check.store(0, kutil::memory_order_relaxed);
        // NOTE: last two params are useless in kthread > r32787
        const int rc = kthread_id_list_init(&_id_wait_list, 512, 512);
        if (rc) {
            LOG(ERROR) << "Fail to init _id_wait_list: " << berror(rc);
            SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc));
            return -1;
        }
        _last_writetime_us.store(cpuwide_now, kutil::memory_order_relaxed);
        _unwritten_bytes.store(0, kutil::memory_order_relaxed);
        _keepalive_options = options.keepalive_options;
        CHECK(NULL == _write_head.load(kutil::memory_order_relaxed));
        _is_write_shutdown = false;
        int fd = options.fd;
        if (!ValidFileDescriptor(fd) && options.connect_on_create) {
            // Connect on create.
            fd = DoConnect(options.connect_abstime, NULL, NULL);
            if (fd < 0) {
                PLOG(ERROR) << "Fail to connect to " << options.remote_side;
                int error_code = errno != 0 ? errno : EHOSTDOWN;
                SetFailed(error_code, "Fail to connect to %s: %s",
                          kutil::endpoint2str(options.remote_side).c_str(),
                          berror(error_code));
                return -1;
            }
        }
        // Must be the last one! Internal fields of this Socket may be accessed
        // just after calling ResetFileDescriptor.
        if (ResetFileDescriptor(options.fd) != 0) {
            const int saved_errno = errno;
            PLOG(ERROR) << "Fail to ResetFileDescriptor";
            SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s",
                      berror(saved_errno));
            return -1;
        }
        HoldHCRelatedRef();
        guard.dismiss();

        return 0;
    }

    void Socket::BeforeRecycled() {
        const bool create_by_connect = CreatedByConnect();
        if (_app_connect) {
            std::shared_ptr<AppConnect> tmp;
            _app_connect.swap(tmp);
            tmp->StopConnect(this);
        }
        if (_conn) {
            SocketConnection *const saved_conn = _conn;
            _conn = NULL;
            saved_conn->BeforeRecycle(this);
        }
        if (_user) {
            SocketUser *const saved_user = _user;
            _user = NULL;
            saved_user->BeforeRecycle(this);
        }
        SharedPart *sp = _shared_part.exchange(NULL, kutil::memory_order_acquire);
        if (sp) {
            sp->RemoveRefManually();
        }

        const int prev_fd = _fd.exchange(-1, kutil::memory_order_relaxed);
        if (ValidFileDescriptor(prev_fd)) {
            if (_on_edge_triggered_events != NULL) {
                _io_event.RemoveConsumer(prev_fd);
            }
            close(prev_fd);
            if (create_by_connect) {
                g_vars->channel_conn << -1;
            }
        }
        _io_event.Reset();

#if KRPC_WITH_RDMA
        if (_rdma_ep) {
            delete _rdma_ep;
            _rdma_ep = NULL;
            _rdma_state = RDMA_UNKNOWN;
        }
#endif

        reset_parsing_context(NULL);
        _read_buf.clear();

        _auth_flag_error.store(0, kutil::memory_order_relaxed);
        kthread_id_error(_auth_id, 0);

        kthread_id_list_destroy(&_id_wait_list);

        if (_ssl_session) {
            SSL_free(_ssl_session);
            _ssl_session = NULL;
        }

        _ssl_ctx = NULL;

        delete _pipeline_q;
        _pipeline_q = NULL;

        delete _auth_context;
        _auth_context = NULL;

        delete _stream_set;
        _stream_set = NULL;

        const SocketId asid = _agent_socket_id.load(kutil::memory_order_relaxed);
        if (asid != INVALID_SOCKET_ID) {
            SocketUniquePtr ptr;
            if (Socket::Address(asid, &ptr) == 0) {
                ptr->ReleaseAdditionalReference();
            }
        }
        g_vars->nsocket << -1;
    }

    void Socket::OnFailed(int error_code, const std::string &error_text) {
        // Update _error_text
        pthread_mutex_lock(&_id_wait_list_mutex);
        _error_code = error_code;
        _error_text = error_text;
        pthread_mutex_unlock(&_id_wait_list_mutex);

        // Do health-checking even if we're not connected before, needed
        // by Channel to revive never-connected socket when server side
        // comes online.
        if (HCEnabled()) {
            bool expect = false;
            if (_hc_started.compare_exchange_strong(expect,
                                                    true,
                                                    kutil::memory_order_relaxed,
                                                    kutil::memory_order_relaxed)) {
                GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
                StartHealthCheck(id(),
                                 GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
            } else {
                // No need to run 2 health checking at the same time.
                RPC_VLOG << "There is already a health checking running "
                            "for SocketId=" << id();
            }
        }
        // Wake up all threads waiting on EPOLLOUT when closing fd
        _epollout_butex->fetch_add(1, kutil::memory_order_relaxed);
        kthread::butex_wake_all(_epollout_butex);

        // Wake up all unresponded RPC.
        CHECK_EQ(0, kthread_id_list_reset2_pthreadsafe(
                &_id_wait_list, error_code, error_text,
                &_id_wait_list_mutex));
        ResetAllStreams(error_code, error_text);
        // _app_connect shouldn't be set to NULL in SetFailed otherwise
        // HC is always not supported.
        // FIXME: Design a better interface for AppConnect
        // if (_app_connect) {
        //     AppConnect* const saved_app_connect = _app_connect;
        //     _app_connect = NULL;
        //     saved_app_connect->StopConnect(this);
        // }
    }

    void Socket::AfterRevived() {
        if (_user) {
            _user->AfterRevived(this);
        } else {
            LOG(INFO) << "Revived " << description() << " (Connectable)";
        }
    }

    std::string Socket::OnDescription() const {
        // NOTE: The output of `description()' should be consistent with operator<<()
        std::string result;
        result.reserve(64);
        const int saved_fd = fd();
        if (saved_fd >= 0) {
            kutil::string_appendf(&result, "fd=%d", saved_fd);
        }
        kutil::string_appendf(&result, " addr=%s",
                              kutil::endpoint2str(remote_side()).c_str());
        const int local_port = local_side().port;
        if (local_port > 0) {
            kutil::string_appendf(&result, ":%d", local_port);
        }
        return result;
    }

    void Socket::HoldHCRelatedRef() {
        if (_health_check_interval_s > 0) {
            _is_hc_related_ref_held = true;
            AddReference();
        }
    }

    void Socket::ReleaseHCRelatedReference() {
        if (_health_check_interval_s > 0) {
            _is_hc_related_ref_held = false;
            Dereference();
        }
    }

    int Socket::WaitAndReset(int32_t expected_nref) {
        const uint32_t id_ver = VersionOfVRefId(id());
        uint64_t vref;
        // Wait until nref == expected_nref.
        while (true) {
            // The acquire fence pairs with release fence in Dereference to avoid
            // inconsistent states to be seen by others.
            vref = versioned_ref();
            if (VersionOfVRef(vref) != id_ver + 1) {
                LOG(WARNING) << "SocketId=" << id() << " is already alive or recycled";
                return -1;
            }
            if (NRefOfVRef(vref) > expected_nref) {
                if (kthread_usleep(1000L/*FIXME*/) < 0) {
                    PLOG_IF(FATAL, errno != ESTOP) << "Fail to sleep";
                    return -1;
                }
            } else if (NRefOfVRef(vref) < expected_nref) {
                RPC_VLOG << "SocketId=" << id()
                         << " was abandoned during health checking";
                return -1;
            } else {
                // nobody holds a health-checking-related reference,
                // so no need to do health checking.
                if (!_is_hc_related_ref_held) {
                    RPC_VLOG << "Nobody holds a health-checking-related reference"
                             << " for SocketId=" << id();
                    return -1;
                }

                break;
            }
        }

        // It's safe to close previous fd (provided expected_nref is correct).
        const int prev_fd = _fd.exchange(-1, kutil::memory_order_relaxed);
        if (ValidFileDescriptor(prev_fd)) {
            if (_on_edge_triggered_events != NULL) {
                _io_event.RemoveConsumer(prev_fd);
            }
            close(prev_fd);
            if (CreatedByConnect()) {
                g_vars->channel_conn << -1;
            }
        }

#if KRPC_WITH_RDMA
        if (_rdma_ep) {
            _rdma_ep->Reset();
            _rdma_state = RDMA_UNKNOWN;
        }
#endif

        _local_side = kutil::EndPoint();
        if (_ssl_session) {
            SSL_free(_ssl_session);
            _ssl_session = NULL;
        }
        _ssl_state = SSL_UNKNOWN;
        _nevent.store(0, kutil::memory_order_relaxed);
        // parsing_context is very likely to be associated with the fd,
        // removing it is a safer choice and required by http2.
        reset_parsing_context(NULL);
        // Must clear _read_buf otehrwise even if the connections is recovered,
        // the kept old data is likely to make parsing fail.
        _read_buf.clear();
        _ninprocess.store(1, kutil::memory_order_relaxed);
        _auth_flag_error.store(0, kutil::memory_order_relaxed);
        kthread_id_error(_auth_id, 0);
        const int rc = kthread_id_create(&_auth_id, NULL, NULL);
        if (rc != 0) {
            LOG(FATAL) << "Fail to create _auth_id, " << berror(rc);
            return -1;
        }

        const int64_t cpuwide_now = kutil::cpuwide_time_us();
        _last_readtime_us.store(cpuwide_now, kutil::memory_order_relaxed);
        _last_writetime_us.store(cpuwide_now, kutil::memory_order_relaxed);
        _logoff_flag.store(false, kutil::memory_order_relaxed);
        {
            KRPC_SCOPED_LOCK(_pipeline_mutex);
            if (_pipeline_q) {
                _pipeline_q->clear();
            }
        }

        SharedPart *sp = GetSharedPart();
        if (sp) {
            sp->circuit_breaker.Reset();
            sp->recent_error_count.store(0, kutil::memory_order_relaxed);
        }
        return 0;
    }

    void Socket::AddRecentError() {
        SharedPart *sp = GetSharedPart();
        if (sp) {
            sp->recent_error_count.fetch_add(1, kutil::memory_order_relaxed);
        }
    }

    int64_t Socket::recent_error_count() const {
        SharedPart *sp = GetSharedPart();
        if (sp) {
            return sp->recent_error_count.load(kutil::memory_order_relaxed);
        }
        return 0;
    }

    int Socket::isolated_times() const {
        SharedPart *sp = GetSharedPart();
        if (sp) {
            return sp->circuit_breaker.isolated_times();
        }
        return 0;
    }

    void Socket::FeedbackCircuitBreaker(int error_code, int64_t latency_us) {
        if (!GetOrNewSharedPart()->circuit_breaker.OnCallEnd(error_code, latency_us)) {
            if (SetFailed(main_socket_id()) == 0) {
                LOG(ERROR) << "Socket[" << *this << "] isolated by circuit breaker";
            }
        }
    }

    int Socket::ReleaseReferenceIfIdle(int idle_seconds) {
        const int64_t last_active_us = last_active_time_us();
        if (kutil::cpuwide_time_us() - last_active_us <= idle_seconds * 1000000L) {
            return 0;
        }
        LOG_IF(WARNING, turbo::get_flag(FLAGS_log_idle_connection_close))
                        << "Close " << *this << " due to no data transmission for "
                        << idle_seconds << " seconds";
        if (shall_fail_me_at_server_stop()) {
            // sockets for streaming purposes (say RTMP) are probably referenced
            // by many places, ReleaseAdditionalReference() cannot notify other
            // places to release refs, SetFailed() is a must.
            return SetFailed(EUNUSED, "No data transmission for %d seconds",
                             idle_seconds);
        }
        return ReleaseAdditionalReference();
    }


    int Socket::SetFailed() {
        return SetFailed(EFAILEDSOCKET, NULL);
    }

    int Socket::SetFailed(int error_code, const char *error_fmt, ...) {
        std::string error_text;
        if (error_fmt != NULL) {
            va_list ap;
            va_start(ap, error_fmt);
            kutil::string_vprintf(&error_text, error_fmt, ap);
            va_end(ap);
        }
        return VersionedRefWithId<Socket>::SetFailed(error_code, error_text);
    }

    int Socket::SetFailed(SocketId id) {
        SocketUniquePtr ptr;
        if (Address(id, &ptr) != 0) {
            return -1;
        }

        return ptr->SetFailed(EFAILEDSOCKET, NULL);
    }

    void Socket::NotifyOnFailed(kthread_id_t id) {
        pthread_mutex_lock(&_id_wait_list_mutex);
        if (!Failed()) {
            const int rc = kthread_id_list_add(&_id_wait_list, id);
            pthread_mutex_unlock(&_id_wait_list_mutex);
            if (rc != 0) {
                kthread_id_error(id, rc);
            }
        } else {
            const int rc = non_zero_error_code();
            const std::string desc = _error_text;
            pthread_mutex_unlock(&_id_wait_list_mutex);
            kthread_id_error2(id, rc, desc);
        }
    }

// For unit-test.
    int Socket::Status(SocketId id, int32_t *nref) {
        const kutil::ResourceId<Socket> slot = SlotOfVRefId<Socket>(id);
        Socket *const m = address_resource(slot);
        if (m != NULL) {
            const uint64_t vref = m->versioned_ref();
            if (VersionOfVRef(vref) == VersionOfVRefId(id)) {
                if (nref) {
                    *nref = NRefOfVRef(vref);
                }
                return 0;
            } else if (VersionOfVRef(vref) == VersionOfVRefId(id) + 1) {
                if (nref) {
                    *nref = NRefOfVRef(vref);
                }
                return 1;
            }
        }
        return -1;
    }

    void *Socket::ProcessEvent(void *arg) {
        // the enclosed Socket is valid and free to access inside this function.
        SocketUniquePtr s(static_cast<Socket *>(arg));
        s->_on_edge_triggered_events(s.get());
        return NULL;
    }

// Check if there're new requests appended.
// If yes, point old_head to reversed new requests and return false;
// If no:
//    old_head is fully written, set _write_head to NULL and return true;
//    old_head is not written yet, keep _write_head unchanged and return false;
// `old_head' is last new_head got from this function or (in another word)
// tail of current writing list.
// `singular_node' is true iff `old_head' is the only node in its list.
    bool Socket::IsWriteComplete(Socket::WriteRequest *old_head,
                                 bool singular_node,
                                 Socket::WriteRequest **new_tail) {
        CHECK(NULL == old_head->next);
        // Try to set _write_head to NULL to mark that the write is done.
        WriteRequest *new_head = old_head;
        WriteRequest *desired = NULL;
        bool return_when_no_more = true;
        if (!old_head->data.empty() || !singular_node) {
            desired = old_head;
            // Write is obviously not complete if old_head is not fully written.
            return_when_no_more = false;
        }
        if (_write_head.compare_exchange_strong(
                new_head, desired, kutil::memory_order_acquire)) {
            // No one added new requests.
            if (new_tail) {
                *new_tail = old_head;
            }
            return return_when_no_more;
        }
        CHECK_NE(new_head, old_head);
        // Above acquire fence pairs release fence of exchange in Write() to make
        // sure that we see all fields of requests set.

        // Someone added new requests.
        // Reverse the list until old_head.
        WriteRequest *tail = NULL;
        WriteRequest *p = new_head;
        do {
            while (p->next == WriteRequest::UNCONNECTED) {
                // TODO(gejun): elaborate this
                sched_yield();
            }
            WriteRequest *const saved_next = p->next;
            p->next = tail;
            tail = p;
            p = saved_next;
            CHECK(p != NULL);
        } while (p != old_head);

        // Link old list with new list.
        old_head->next = tail;
        // Call Setup() from oldest to newest, notice that the calling sequence
        // matters for protocols using pipelined_count, this is why we don't
        // call Setup in above loop which is from newest to oldest.
        for (WriteRequest *q = tail; q; q = q->next) {
            q->Setup(this);
        }
        if (new_tail) {
            *new_tail = new_head;
        }
        return false;
    }

    int Socket::WaitEpollOut(int fd, bool pollin, const timespec *abstime) {
        if (!ValidFileDescriptor(fd)) {
            return 0;
        }
        // Do not need to check addressable since it will be called by
        // health checker which called `SetFailed' before
        const int expected_val = _epollout_butex->load(kutil::memory_order_relaxed);
        if (_io_event.RegisterEvent(fd, pollin) != 0) {
            return -1;
        }

        int rc = kthread::butex_wait(_epollout_butex, expected_val, abstime);
        const int saved_errno = errno;
        if (rc < 0 && errno == EWOULDBLOCK) {
            // Could be writable or spurious wakeup
            rc = 0;
        }
        // Ignore return value since `fd' might have been removed
        // by `RemoveConsumer' in `SetFailed'
        kutil::ignore_result(_io_event.UnregisterEvent(fd, pollin));
        errno = saved_errno;
        // Could be writable or spurious wakeup (by former epollout)
        return rc;
    }

    int Socket::Connect(const timespec *abstime,
                        int (*on_connect)(int, int, void *), void *data) {
        if (_ssl_ctx) {
            _ssl_state = SSL_CONNECTING;
        } else {
            _ssl_state = SSL_OFF;
        }
        struct sockaddr_storage serv_addr;
        socklen_t addr_size = 0;
        if (kutil::endpoint2sockaddr(remote_side(), &serv_addr, &addr_size) != 0) {
            PLOG(ERROR) << "Fail to get sockaddr";
            return -1;
        }
        kutil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));
        if (sockfd < 0) {
            PLOG(ERROR) << "Fail to create socket";
            return -1;
        }
        CHECK_EQ(0, kutil::make_close_on_exec(sockfd));
        // We need to do async connect (to manage the timeout by ourselves).
        CHECK_EQ(0, kutil::make_non_blocking(sockfd));

        const int rc = ::connect(
                sockfd, (struct sockaddr *) &serv_addr, addr_size);
        if (rc != 0 && errno != EINPROGRESS) {
            PLOG(WARNING) << "Fail to connect to " << remote_side();
            return -1;
        }
        if (on_connect) {
            EpollOutRequest *req = new(std::nothrow) EpollOutRequest;
            if (req == NULL) {
                LOG(FATAL) << "Fail to new EpollOutRequest";
                return -1;
            }
            req->fd = sockfd;
            req->timer_id = 0;
            req->on_epollout_event = on_connect;
            req->data = data;
            // A temporary Socket to hold `EpollOutRequest', which will
            // be added into epoll device soon
            SocketId connect_id;
            SocketOptions options;
            options.kthread_tag = _io_event.kthread_tag();
            options.user = req;
            if (Socket::Create(options, &connect_id) != 0) {
                LOG(FATAL) << "Fail to create Socket";
                delete req;
                return -1;
            }
            // From now on, ownership of `req' has been transferred to
            // `connect_id'. We hold an additional reference here to
            // ensure `req' to be valid in this scope
            SocketUniquePtr s;
            CHECK_EQ(0, Socket::Address(connect_id, &s));

            // Add `sockfd' into epoll so that `HandleEpollOutRequest' will
            // be called with `req' when epoll event reaches
            if (s->_io_event.RegisterEvent(sockfd, false) != 0) {
                const int saved_errno = errno;
                PLOG(WARNING) << "Fail to add fd=" << sockfd << " into epoll";
                s->SetFailed(saved_errno, "Fail to add fd=%d into epoll: %s",
                             (int) sockfd, berror(saved_errno));
                return -1;
            }

            // Register a timer for EpollOutRequest. Note that the timeout
            // callback has no race with the one above as both of them try
            // to `SetFailed' `connect_id' while only one of them can succeed
            // It also work when `HandleEpollOutRequest' has already been
            // called before adding the timer since it will be removed
            // inside destructor of `EpollOutRequest' after leaving this scope
            if (abstime) {
                int rc = kthread_timer_add(&req->timer_id, *abstime,
                                           HandleEpollOutTimeout,
                                           (void *) connect_id);
                if (rc) {
                    LOG(ERROR) << "Fail to add timer: " << berror(rc);
                    s->SetFailed(rc, "Fail to add timer: %s", berror(rc));
                    return -1;
                }
            }

        } else {
            if (WaitEpollOut(sockfd, false, abstime) != 0) {
                PLOG(WARNING) << "Fail to wait EPOLLOUT of fd=" << sockfd;
                return -1;
            }
            if (CheckConnected(sockfd) != 0) {
                return -1;
            }
        }
        return sockfd.release();
    }

    int Socket::CheckConnected(int sockfd) {
        if (sockfd == STREAM_FAKE_FD) {
            return 0;
        }
        int err = 0;
        socklen_t errlen = sizeof(err);
        if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) {
            PLOG(ERROR) << "Fail to getsockopt of fd=" << sockfd;
            return -1;
        }
        if (err != 0) {
            CHECK_NE(err, EINPROGRESS);
            errno = err;
            return -1;
        }

        if (turbo::get_flag(FLAGS_log_connected)) {
            kutil::EndPoint local_point;
            CHECK_EQ(0, kutil::get_local_side(sockfd, &local_point));
            LOG(INFO) << "Connected to " << remote_side()
                      << " via fd=" << (int) sockfd << " SocketId=" << id()
                      << " local_side=" << local_point;
        }

        // Doing SSL handshake after TCP connected
        return SSLHandshake(sockfd, false);
    }

    int Socket::DoConnect(const timespec *abstime,
                          int (*on_connect)(int, int, void *), void *data) {
        if (_conn) {
            return _conn->Connect(this, abstime, on_connect, data);
        } else {
            return Connect(abstime, on_connect, data);
        }
    }

    int Socket::ConnectIfNot(const timespec *abstime, WriteRequest *req) {
        if (_fd.load(kutil::memory_order_consume) >= 0) {
            return 0;
        }
        // Set tag for client side socket
        _io_event.set_kthread_tag(kthread_self_tag());
        // Have to hold a reference for `req'
        SocketUniquePtr s;
        ReAddress(&s);
        req->set_socket(s.get());
        if (DoConnect(abstime, KeepWriteIfConnected, req) < 0) {
            return -1;
        }
        s.release();
        return 1;
    }

    void Socket::WakeAsEpollOut() {
        _epollout_butex->fetch_add(1, kutil::memory_order_release);
        kthread::butex_wake_except(_epollout_butex, 0);
    }

    int Socket::OnOutputEvent(void *user_data, uint32_t,
                              const kthread_attr_t &) {
        auto id = reinterpret_cast<SocketId>(user_data);
        SocketUniquePtr s;
        // Since Sockets might have been `SetFailed' before they were
        // added into epoll, these sockets miss the signal inside
        // `SetFailed' and therefore must be signalled here using
        // `AddressFailedAsWell' to prevent waiting forever
        if (Socket::AddressFailedAsWell(id, &s) < 0) {
            // Ignore recycled sockets
            return -1;
        }

        EpollOutRequest *req = dynamic_cast<EpollOutRequest *>(s->user());
        if (req != NULL) {
            return s->HandleEpollOutRequest(0, req);
        }

        // Currently `WaitEpollOut' needs `_epollout_butex'
        // TODO(jiangrujie): Remove this in the future
        s->_epollout_butex->fetch_add(1, kutil::memory_order_relaxed);
        kthread::butex_wake_except(s->_epollout_butex, 0);
        return 0;
    }

    void Socket::HandleEpollOutTimeout(void *arg) {
        SocketId id = (SocketId) arg;
        SocketUniquePtr s;
        if (Socket::Address(id, &s) != 0) {
            return;
        }
        EpollOutRequest *req = dynamic_cast<EpollOutRequest *>(s->user());
        if (req == NULL) {
            LOG(FATAL) << "Impossible! SocketUser MUST be EpollOutRequest here";
            return;
        }
        s->HandleEpollOutRequest(ETIMEDOUT, req);
    }

    int Socket::HandleEpollOutRequest(int error_code, EpollOutRequest *req) {
        // Only one thread can `SetFailed' this `Socket' successfully
        // Also after this `req' will be destroyed when its reference
        // hits zero
        if (SetFailed() != 0) {
            return -1;
        }
        // We've got the right to call user callback
        // The timer will be removed inside destructor of EpollOutRequest
        kutil::ignore_result(_io_event.UnregisterEvent(req->fd, false));
        return req->on_epollout_event(req->fd, error_code, req->data);
    }

    void Socket::AfterAppConnected(int err, void *data) {
        WriteRequest *req = static_cast<WriteRequest *>(data);
        if (err == 0) {
            Socket *const s = req->get_socket();
            SharedPart *sp = s->GetSharedPart();
            if (sp) {
                sp->num_continuous_connect_timeouts.store(0, kutil::memory_order_relaxed);
            }
            // requests are not setup yet. check the comment on Setup() in Write()
            req->Setup(s);
            kthread_t th;
            if (kthread_start_background(
                    &th, &KTHREAD_ATTR_NORMAL, KeepWrite, req) != 0) {
                PLOG(WARNING) << "Fail to start KeepWrite";
                KeepWrite(req);
            }
        } else {
            SocketUniquePtr s(req->get_socket());
            if (err == ETIMEDOUT) {
                SharedPart *sp = s->GetOrNewSharedPart();
                if (sp->num_continuous_connect_timeouts.fetch_add(
                        1, kutil::memory_order_relaxed) + 1 >=
                    turbo::get_flag(FLAGS_connect_timeout_as_unreachable)) {
                    // the race between store and fetch_add(in another thread) is
                    // OK since a critial error is about to return.
                    sp->num_continuous_connect_timeouts.store(
                            0, kutil::memory_order_relaxed);
                    err = ENETUNREACH;
                }
            }

            s->SetFailed(err, "Fail to connect %s: %s",
                         s->description().c_str(), berror(err));
            s->ReleaseAllFailedWriteRequests(req);
        }
    }

    static void *RunClosure(void *arg) {
        google::protobuf::Closure *done = (google::protobuf::Closure *) arg;
        done->Run();
        return NULL;
    }

    int Socket::KeepWriteIfConnected(int fd, int err, void *data) {
        WriteRequest *req = static_cast<WriteRequest *>(data);
        Socket *s = req->get_socket();
        if (err == 0 && s->ssl_state() == SSL_CONNECTING) {
            // Run ssl connect in a new kthread to avoid blocking
            // the current kthread (thus blocking the EventDispatcher)
            kthread_t th;
            std::unique_ptr<google::protobuf::Closure> thrd_func(krpc::NewCallback(
                    Socket::CheckConnectedAndKeepWrite, fd, err, data));
            if ((err = kthread_start_background(&th, &KTHREAD_ATTR_NORMAL,
                                                RunClosure, thrd_func.get())) == 0) {
                thrd_func.release();
                return 0;
            } else {
                PLOG(ERROR) << "Fail to start kthread";
                // Fall through with non zero `err'
            }
        }
        CheckConnectedAndKeepWrite(fd, err, data);
        return 0;
    }

    void Socket::CheckConnectedAndKeepWrite(int fd, int err, void *data) {
        kutil::fd_guard sockfd(fd);
        WriteRequest *req = static_cast<WriteRequest *>(data);
        Socket *s = req->get_socket();
        CHECK_GE(sockfd, 0);
        if (err == 0 && s->CheckConnected(sockfd) == 0
            && s->ResetFileDescriptor(sockfd) == 0) {
            if (s->CreatedByConnect()) {
                g_vars->channel_conn << 1;
            }
            if (s->_app_connect) {
                s->_app_connect->StartConnect(req->get_socket(),
                                              AfterAppConnected, req);
            } else {
                // Successfully created a connection
                AfterAppConnected(0, req);
            }
            // Release this socket for KeepWrite
            sockfd.release();
        } else {
            if (err == 0) {
                err = errno ? errno : -1;
            }
            AfterAppConnected(err, req);
        }
    }

    inline int SetError(kthread_id_t id_wait, int ec) {
        if (id_wait != INVALID_KTHREAD_ID) {
            kthread_id_error(id_wait, ec);
            return 0;
        } else {
            errno = ec;
            return -1;
        }
    }

    int Socket::ConductError(kthread_id_t id_wait) {
        pthread_mutex_lock(&_id_wait_list_mutex);
        if (Failed()) {
            const int error_code = non_zero_error_code();
            if (id_wait != INVALID_KTHREAD_ID) {
                const std::string error_text = _error_text;
                pthread_mutex_unlock(&_id_wait_list_mutex);
                kthread_id_error2(id_wait, error_code, error_text);
                return 0;
            } else {
                pthread_mutex_unlock(&_id_wait_list_mutex);
                errno = error_code;
                return -1;
            }
        } else {
            pthread_mutex_unlock(&_id_wait_list_mutex);
            return 1;
        }
    }

    X509 *Socket::GetPeerCertificate() const {
        if (ssl_state() != SSL_CONNECTED) {
            return NULL;
        }
        KRPC_SCOPED_LOCK(_ssl_session_mutex);
        return SSL_get_peer_certificate(_ssl_session);
    }

    int Socket::Write(kutil::IOBuf *data, const WriteOptions *options_in) {
        WriteOptions opt;
        if (options_in) {
            opt = *options_in;
        }
        if (data->empty()) {
            return SetError(opt.id_wait, EINVAL);
        }
        if (opt.pipelined_count > MAX_PIPELINED_COUNT) {
            LOG(ERROR) << "pipelined_count=" << opt.pipelined_count
                       << " is too large";
            return SetError(opt.id_wait, EOVERFLOW);
        }
        if (Failed()) {
            const int rc = ConductError(opt.id_wait);
            if (rc <= 0) {
                return rc;
            }
        }

        if (!opt.ignore_eovercrowded && _overcrowded) {
            return SetError(opt.id_wait, EOVERCROWDED);
        }

        WriteRequest *req = kutil::get_object<WriteRequest>();
        if (!req) {
            return SetError(opt.id_wait, ENOMEM);
        }

        req->data.swap(*data);
        // Set `req->next' to UNCONNECTED so that the KeepWrite thread will
        // wait until it points to a valid WriteRequest or NULL.
        req->next = WriteRequest::UNCONNECTED;
        req->id_wait = opt.id_wait;
        req->clear_and_set_control_bits(opt.notify_on_success, opt.shutdown_write);
        req->set_pipelined_count_and_user_message(
                opt.pipelined_count, DUMMY_USER_MESSAGE, opt.auth_flags);
        return StartWrite(req, opt);
    }

    int Socket::Write(SocketMessagePtr<> &msg, const WriteOptions *options_in) {
        WriteOptions opt;
        if (options_in) {
            opt = *options_in;
        }
        if (opt.pipelined_count > MAX_PIPELINED_COUNT) {
            LOG(ERROR) << "pipelined_count=" << opt.pipelined_count
                       << " is too large";
            return SetError(opt.id_wait, EOVERFLOW);
        }

        if (Failed()) {
            const int rc = ConductError(opt.id_wait);
            if (rc <= 0) {
                return rc;
            }
        }

        if (!opt.ignore_eovercrowded && _overcrowded) {
            return SetError(opt.id_wait, EOVERCROWDED);
        }

        WriteRequest *req = kutil::get_object<WriteRequest>();
        if (!req) {
            return SetError(opt.id_wait, ENOMEM);
        }

        // Set `req->next' to UNCONNECTED so that the KeepWrite thread will
        // wait until it points to a valid WriteRequest or NULL.
        req->next = WriteRequest::UNCONNECTED;
        req->id_wait = opt.id_wait;
        req->clear_and_set_control_bits(opt.notify_on_success, opt.shutdown_write);
        req->set_pipelined_count_and_user_message(
                opt.pipelined_count, msg.release(), opt.auth_flags);
        return StartWrite(req, opt);
    }

    int Socket::StartWrite(WriteRequest *req, const WriteOptions &opt) {
        // Release fence makes sure the thread getting request sees *req
        WriteRequest *const prev_head =
                _write_head.exchange(req, kutil::memory_order_release);
        if (prev_head != NULL) {
            // Someone is writing to the fd. The KeepWrite thread may spin
            // until req->next to be non-UNCONNECTED. This process is not
            // lock-free, but the duration is so short(1~2 instructions,
            // depending on compiler) that the spin rarely occurs in practice
            // (I've not seen any spin in highly contended tests).
            req->next = prev_head;
            return 0;
        }

        int saved_errno = 0;
        kthread_t th;
        SocketUniquePtr ptr_for_keep_write;
        ssize_t nw = 0;
        int ret = 0;

        // We've got the right to write.
        req->next = NULL;

        // Fast fail when write has been shutdown.
        if (_is_write_shutdown) {
            goto FAIL_TO_WRITE;
        }
        _is_write_shutdown = req->need_shutdown_write();

        // Connect to remote_side() if not.
        ret = ConnectIfNot(opt.abstime, req);
        if (ret < 0) {
            saved_errno = errno;
            SetFailed(errno, "Fail to connect %s directly: %m", description().c_str());
            goto FAIL_TO_WRITE;
        } else if (ret == 1) {
            // We are doing connection. Callback `KeepWriteIfConnected'
            // will be called with `req' at any moment after
            return 0;
        }

        // NOTE: Setup() MUST be called after Connect which may call app_connect,
        // which is assumed to run before any SocketMessage.AppendAndDestroySelf()
        // in some protocols(namely RTMP).
        req->Setup(this);

        if (opt.write_in_background || ssl_state() != SSL_OFF) {
            // Writing into SSL may block the current kthread, always write
            // in the background.
            goto KEEPWRITE_IN_BACKGROUND;
        }

        // Write once in the calling thread. If the write is not complete,
        // continue it in KeepWrite thread.
        if (_conn) {
            kutil::IOBuf *data_arr[1] = {&req->data};
            nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
        } else {
#if KRPC_WITH_RDMA
            if (_rdma_ep && _rdma_state != RDMA_OFF) {
                kutil::IOBuf* data_arr[1] = { &req->data };
                nw = _rdma_ep->CutFromIOBufList(data_arr, 1);
            } else {
#else
            {
#endif
                nw = req->data.cut_into_file_descriptor(fd());
            }
        }
        if (nw < 0) {
            // RTMP may return EOVERCROWDED
            if (errno != EAGAIN && errno != EOVERCROWDED) {
                saved_errno = errno;
                // EPIPE is common in pooled connections + backup requests.
                PLOG_IF(WARNING, errno != EPIPE) << "Fail to write into " << *this;
                SetFailed(saved_errno, "Fail to write into %s: %s",
                          description().c_str(), berror(saved_errno));
                goto FAIL_TO_WRITE;
            }
        } else {
            AddOutputBytes(nw);
        }
        if (IsWriteComplete(req, true, NULL)) {
            ReturnSuccessfulWriteRequest(req);
            return 0;
        }

        KEEPWRITE_IN_BACKGROUND:
        ReAddress(&ptr_for_keep_write);
        req->set_socket(ptr_for_keep_write.release());
        if (kthread_start_background(&th, &KTHREAD_ATTR_NORMAL,
                                     KeepWrite, req) != 0) {
            LOG(FATAL) << "Fail to start KeepWrite";
            KeepWrite(req);
        }
        return 0;

        FAIL_TO_WRITE:
        // `SetFailed' before `ReturnFailedWriteRequest' (which will calls
        // `on_reset' callback inside the id object) so that we immediately
        // know this socket has failed inside the `on_reset' callback
        ReleaseAllFailedWriteRequests(req);
        errno = saved_errno;
        return -1;
    }

    static const size_t DATA_LIST_MAX = 256;

    void *Socket::KeepWrite(void *void_arg) {
        g_vars->nkeepwrite << 1;
        WriteRequest *req = static_cast<WriteRequest *>(void_arg);
        SocketUniquePtr s(req->get_socket());

        // When error occurs, spin until there's no more requests instead of
        // returning directly otherwise _write_head is permantly non-NULL which
        // makes later Write() abnormal.
        WriteRequest *cur_tail = NULL;
        do {
            // req was written, skip it.
            bool need_shutdown = false;
            if (req->next != NULL && req->data.empty()) {
                WriteRequest *const saved_req = req;
                need_shutdown = req->need_shutdown_write();
                req = req->next;
                s->ReturnSuccessfulWriteRequest(saved_req);
            }
            if (need_shutdown) {
                LOG(WARNING) << "Shutdown write of " << *s;
                break;
            }

            const ssize_t nw = s->DoWrite(req);
            if (nw < 0) {
                if (errno != EAGAIN && errno != EOVERCROWDED) {
                    const int saved_errno = errno;
                    PLOG(WARNING) << "Fail to keep-write into " << *s;
                    s->SetFailed(saved_errno, "Fail to keep-write into %s: %s",
                                 s->description().c_str(), berror(saved_errno));
                    break;
                }
            } else {
                s->AddOutputBytes(nw);
            }
            // Release WriteRequest until non-empty data, last request or shutdown write.
            while (req->next != NULL && req->data.empty()) {
                WriteRequest *const saved_req = req;
                need_shutdown = req->need_shutdown_write();
                req = req->next;
                s->ReturnSuccessfulWriteRequest(saved_req);
                if (need_shutdown) {
                    break;
                }
            }
            if (need_shutdown) {
                LOG(WARNING) << "Shutdown write of " << *s;
                break;
            }
            // TODO(gejun): wait for epollout when we actually have written
            // all the data. This weird heuristic reduces 30us delay...
            // Update(12/22/2015): seem not working. better switch to correct code.
            // Update(1/8/2016, r31823): Still working.
            // Update(8/15/2017): Not working, performance downgraded.
            //if (nw <= 0 || req->data.empty()/*note*/) {
            if (nw <= 0) {
                // NOTE: Waiting epollout within timeout is a must to force
                // KeepWrite to check and setup pending WriteRequests periodically,
                // which may turn on _overcrowded to stop pending requests from
                // growing infinitely.
                const timespec duetime =
                        kutil::milliseconds_from_now(WAIT_EPOLLOUT_TIMEOUT_MS);
#if KRPC_WITH_RDMA
                if (s->_rdma_state == RDMA_ON) {
                    const int expected_val = s->_epollout_butex
                        ->load(kutil::memory_order_acquire);
                    CHECK(s->_rdma_ep != NULL);
                    if (!s->_rdma_ep->IsWritable()) {
                        g_vars->nwaitepollout << 1;
                        if (kthread::butex_wait(s->_epollout_butex,
                                expected_val, &duetime) < 0) {
                            if (errno != EAGAIN && errno != ETIMEDOUT) {
                                const int saved_errno = errno;
                                PLOG(WARNING) << "Fail to wait rdma window of " << *s;
                                s->SetFailed(saved_errno, "Fail to wait rdma window of %s: %s",
                                        s->description().c_str(), berror(saved_errno));
                            }
                            if (s->Failed()) {
                                // NOTE:
                                // Different from TCP, we cannot find the RDMA channel
                                // failed by writing to it. Thus we must check if it
                                // is already failed here.
                                break;
                            }
                        }
                    }
                } else {
#else
                {
#endif
                    g_vars->nwaitepollout << 1;
                    bool pollin = (s->_on_edge_triggered_events != NULL);
                    const int rc = s->WaitEpollOut(s->fd(), pollin, &duetime);
                    if (rc < 0 && errno != ETIMEDOUT) {
                        const int saved_errno = errno;
                        PLOG(WARNING) << "Fail to wait epollout of " << *s;
                        s->SetFailed(saved_errno, "Fail to wait epollout of %s: %s",
                                     s->description().c_str(), berror(saved_errno));
                        break;
                    }
                }
            }
            if (NULL == cur_tail) {
                for (cur_tail = req; cur_tail->next != NULL;
                     cur_tail = cur_tail->next);
            }
            // Return when there's no more WriteRequests and req is completely
            // written.
            if (s->IsWriteComplete(cur_tail, (req == cur_tail), &cur_tail)) {
                CHECK_EQ(cur_tail, req);
                s->ReturnSuccessfulWriteRequest(req);
                return NULL;
            }
        } while (1);

        // Error occurred, release all requests until no new requests.
        s->ReleaseAllFailedWriteRequests(req);
        return NULL;
    }

    ssize_t Socket::DoWrite(WriteRequest *req) {
        // Group kutil::IOBuf in the list into a batch array.
        kutil::IOBuf *data_list[DATA_LIST_MAX];
        size_t ndata = 0;
        for (WriteRequest *p = req; p != NULL && ndata < DATA_LIST_MAX;
             p = p->next) {
            data_list[ndata++] = &p->data;
            if (p->need_shutdown_write()) {
                // Write WriteRequest until shutdown write.
                _is_write_shutdown = true;
                break;
            }
        }

        if (ssl_state() == SSL_OFF) {
            // Write IOBuf in the batch array into the fd.
            if (_conn) {
                return _conn->CutMessageIntoFileDescriptor(fd(), data_list, ndata);
            } else {
#if KRPC_WITH_RDMA
                if (_rdma_ep && _rdma_state != RDMA_OFF) {
                    return _rdma_ep->CutFromIOBufList(data_list, ndata);
                }
#endif
                return kutil::IOBuf::cut_multiple_into_file_descriptor(
                        fd(), data_list, ndata);
            }
        }

        CHECK_EQ(SSL_CONNECTED, ssl_state());
        if (_conn) {
            // TODO: Separate SSL stuff from SocketConnection
            KRPC_SCOPED_LOCK(_ssl_session_mutex);
            return _conn->CutMessageIntoSSLChannel(_ssl_session, data_list, ndata);
        }
        int ssl_error = 0;
        ssize_t nw = 0;
        {
            KRPC_SCOPED_LOCK(_ssl_session_mutex);
            nw = kutil::IOBuf::cut_multiple_into_SSL_channel(_ssl_session, data_list, ndata, &ssl_error);
        }
        switch (ssl_error) {
            case SSL_ERROR_NONE:
                break;

            case SSL_ERROR_WANT_READ:
                // Disable renegotiation
                errno = EPROTO;
                return -1;

            case SSL_ERROR_WANT_WRITE:
                errno = EAGAIN;
                break;

            default: {
                const unsigned long e = ERR_get_error();
                if (e != 0) {
                    LOG(WARNING) << "Fail to write into ssl_fd=" << fd() << ": "
                                 << SSLError(e);
                    errno = ESSL;
                } else {
                    // System error with corresponding errno set
                    PLOG(WARNING) << "Fail to write into ssl_fd=" << fd();
                }
                break;
            }
        }
        return nw;
    }

    int Socket::SSLHandshake(int fd, bool server_mode) {
        if (_ssl_ctx == NULL) {
            if (server_mode) {
                LOG(ERROR) << "Lack SSL configuration to handle SSL request";
                return -1;
            }
            return 0;
        }

        // TODO: Reuse ssl session id for client
        if (_ssl_session) {
            // Free the last session, which may be deprecated when socket failed
            SSL_free(_ssl_session);
        }
        _ssl_session = CreateSSLSession(_ssl_ctx->raw_ctx, id(), fd, server_mode);
        if (_ssl_session == NULL) {
            LOG(ERROR) << "Fail to CreateSSLSession";
            return -1;
        }
#if defined(SSL_CTRL_SET_TLSEXT_HOSTNAME)
        if (!_ssl_ctx->sni_name.empty()) {
            SSL_set_tlsext_host_name(_ssl_session, _ssl_ctx->sni_name.c_str());
        }
#endif

        _ssl_state = SSL_CONNECTING;

        // Loop until SSL handshake has completed. For SSL_ERROR_WANT_READ/WRITE,
        // we use kthread_fd_wait as polling mechanism instead of EventDispatcher
        // as it may confuse the origin event processing code.
        while (true) {
            ERR_clear_error();
            int rc = SSL_do_handshake(_ssl_session);
            if (rc == 1) {
                // In client, check if server returned ALPN selection is acceptable.
                if (!server_mode && !_ssl_ctx->alpn_protocols.empty()) {
                    const unsigned char *alpn_proto;
                    unsigned int alpn_proto_length;
                    SSL_get0_alpn_selected(_ssl_session, &alpn_proto, &alpn_proto_length);
                    if (!alpn_proto) {
                        LOG(ERROR) << "Server returned no ALPN protocol";
                        return -1;
                    }

                    std::string alpn_protocol(
                            reinterpret_cast<char const *>(alpn_proto),
                            alpn_proto_length
                    );
                    if (
                            std::find(
                                    _ssl_ctx->alpn_protocols.begin(),
                                    _ssl_ctx->alpn_protocols.end(),
                                    alpn_protocol
                            ) == _ssl_ctx->alpn_protocols.end()
                            ) {
                        LOG(ERROR) << "Server returned unacceptable ALPN protocol: "
                                   << alpn_protocol;
                        return -1;
                    }
                }

                _ssl_state = SSL_CONNECTED;
                AddBIOBuffer(_ssl_session, fd, turbo::get_flag(FLAGS_ssl_bio_buffer_size));
                return 0;
            }

            int ssl_error = SSL_get_error(_ssl_session, rc);
            switch (ssl_error) {
                case SSL_ERROR_WANT_READ:
#if defined(OS_LINUX)
                    if (kthread_fd_wait(fd, EPOLLIN) != 0) {
#elif defined(OS_MACOSX)
                        if (kthread_fd_wait(fd, EVFILT_READ) != 0) {
#endif
                        return -1;
                    }
                    break;

                case SSL_ERROR_WANT_WRITE:
#if defined(OS_LINUX)
                    if (kthread_fd_wait(fd, EPOLLOUT) != 0) {
#elif defined(OS_MACOSX)
                        if (kthread_fd_wait(fd, EVFILT_WRITE) != 0) {
#endif
                        return -1;
                    }
                    break;

                default: {
                    const unsigned long e = ERR_get_error();
                    if (ssl_error == SSL_ERROR_ZERO_RETURN || e == 0) {
                        errno = ECONNRESET;
                        LOG(ERROR) << "SSL connection was shutdown by peer: " << _remote_side;
                    } else if (ssl_error == SSL_ERROR_SYSCALL) {
                        PLOG(ERROR) << "Fail to SSL_do_handshake";
                    } else {
                        errno = ESSL;
                        LOG(ERROR) << "Fail to SSL_do_handshake: " << SSLError(e);
                    }
                    return -1;
                }
            }
        }
    }

    ssize_t Socket::DoRead(size_t size_hint) {
        if (ssl_state() == SSL_UNKNOWN) {
            int error_code = 0;
            _ssl_state = DetectSSLState(fd(), &error_code);
            switch (ssl_state()) {
                case SSL_UNKNOWN:
                    if (error_code == 0) {  // EOF
                        return 0;
                    } else {
                        errno = error_code;
                        return -1;
                    }

                case SSL_CONNECTING:
                    if (SSLHandshake(fd(), true) != 0) {
                        errno = EINVAL;
                        return -1;
                    }
                    break;

                case SSL_CONNECTED:
                    CHECK(false) << "Impossible to reach here";
                    break;

                case SSL_OFF:
                    break;
            }
        }
        // _ssl_state has been set
        if (ssl_state() == SSL_OFF) {
            if (_force_ssl) {
                errno = ESSL;
                return -1;
            }
            CHECK(_rdma_state == RDMA_OFF);
            return _read_buf.append_from_file_descriptor(fd(), size_hint);
        }

        CHECK_EQ(SSL_CONNECTED, ssl_state());
        int ssl_error = 0;
        ssize_t nr = 0;
        {
            KRPC_SCOPED_LOCK(_ssl_session_mutex);
            nr = _read_buf.append_from_SSL_channel(_ssl_session, &ssl_error, size_hint);
        }
        switch (ssl_error) {
            case SSL_ERROR_NONE:  // `nr' > 0
                break;

            case SSL_ERROR_WANT_READ:
                // Regard this error as EAGAIN
                errno = EAGAIN;
                break;

            case SSL_ERROR_WANT_WRITE:
                // Disable renegotiation
                errno = EPROTO;
                return -1;

            default: {
                const unsigned long e = ERR_get_error();
                if (nr == 0) {
                    // Socket EOF or SSL session EOF
                } else if (e != 0) {
                    LOG(WARNING) << "Fail to read from ssl_fd=" << fd()
                                 << ": " << SSLError(e);
                    errno = ESSL;
                } else {
                    // System error with corresponding errno set.
                    bool is_fatal_error = (ssl_error != SSL_ERROR_ZERO_RETURN &&
                                           ssl_error != SSL_ERROR_SYSCALL) ||
                                          BIO_fd_non_fatal_error(errno) != 0 ||
                                          nr < 0;
                    PLOG_IF(WARNING, is_fatal_error) << "Fail to read from ssl_fd=" << fd();
                }
                break;
            }
        }
        return nr;
    }

    int Socket::FightAuthentication(int *auth_error) {
        // Use relaxed fence since `kthread_id_trylock' ensures thread safety
        // Here `flag_error' just acts like a cache information
        uint64_t flag_error = _auth_flag_error.load(kutil::memory_order_relaxed);
        if (flag_error & AUTH_FLAG) {
            // Already authenticated
            *auth_error = (int32_t) (flag_error & 0xFFFFFFFFul);
            return EINVAL;
        }
        if (0 == kthread_id_trylock(_auth_id, NULL)) {
            // Winner
            return 0;
        } else {
            // Use relaxed fence since `kthread_id_join' has acquire fence to ensure
            // `_auth_flag_error' to be the latest value
            kthread_id_join(_auth_id);
            flag_error = _auth_flag_error.load(kutil::memory_order_relaxed);
            *auth_error = (int32_t) (flag_error & 0xFFFFFFFFul);
            return EINVAL;
        }
    }

    void Socket::SetAuthentication(int error_code) {
        uint64_t expected = 0;
        // `kthread_id_destroy' has release fence to prevent this CAS being
        // reordered after it.
        if (_auth_flag_error.compare_exchange_strong(
                expected, (AUTH_FLAG | error_code),
                kutil::memory_order_relaxed)) {
            // As expected
            if (error_code != 0) {
                SetFailed(error_code, "Fail to authenticate %s", description().c_str());
            }
            CHECK_EQ(0, kthread_id_unlock_and_destroy(_auth_id));
        }
    }

    AuthContext *Socket::mutable_auth_context() {
        if (_auth_context != NULL) {
            LOG(FATAL) << "Impossible! This function is supposed to be called "
                          "only once when verification succeeds in server side";
            return NULL;
        }
        _auth_context = new(std::nothrow) AuthContext();
        CHECK(_auth_context);
        return _auth_context;
    }

    int Socket::OnInputEvent(void *user_data, uint32_t events,
                             const kthread_attr_t &thread_attr) {
        auto id = reinterpret_cast<SocketId>(user_data);
        SocketUniquePtr s;
        if (Address(id, &s) < 0) {
            return -1;
        }
        if (NULL == s->_on_edge_triggered_events) {
            // Callback can be NULL when receiving error epoll events
            // (Added into epoll by `WaitConnected')
            return 0;
        }
        if (s->fd() < 0) {
#if defined(OS_LINUX)
            CHECK(!(events & EPOLLIN)) << "epoll_events=" << events;
#elif defined(OS_MACOSX)
            CHECK((short)events != EVFILT_READ) << "kqueue filter=" << events;
#endif
            return -1;
        }

        // if (events & has_epollrdhup) {
        //     s->_eof = 1;
        // }
        // Passing e[i].events causes complex visibility issues and
        // requires stronger memory fences, since reading the fd returns
        // error as well, we don't pass the events.
        if (s->_nevent.fetch_add(1, kutil::memory_order_acq_rel) == 0) {
            // According to the stats, above fetch_add is very effective. In a
            // server processing 1 million requests per second, this counter
            // is just 1500~1700/s
            g_vars->neventthread << 1;

            kthread_t tid;
            // transfer ownership as well, don't use s anymore!
            Socket *const p = s.release();

            kthread_attr_t attr = thread_attr;
            attr.keytable_pool = p->_keytable_pool;
            attr.tag = kthread_self_tag();
            if (turbo::get_flag(FLAGS_usercode_in_coroutine)) {
                ProcessEvent(p);
            } else if (kthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
                LOG(FATAL) << "Fail to start ProcessEvent";
                ProcessEvent(p);
            }
        }
        return 0;
    }

    void DereferenceSocket(Socket *s) {
        if (s) {
            s->Dereference();
        }
    }

    void Socket::UpdateStatsEverySecond(int64_t now_ms) {
        SharedPart *sp = GetSharedPart();
        if (sp) {
            sp->UpdateStatsEverySecond(now_ms);
        }
    }

    template<typename T>
    struct ObjectPtr {
        ObjectPtr(const T *obj) : _obj(obj) {}

        const T *_obj;
    };

    template<typename T>
    ObjectPtr<T> ShowObject(const T *obj) { return ObjectPtr<T>(obj); }

    template<typename T>
    std::ostream &operator<<(std::ostream &os, const ObjectPtr<T> &obj) {
        if (obj._obj != NULL) {
            os << '(' << kutil::class_name_str(*obj._obj) << "*)";
        }
        return os << obj._obj;
    }

    void Socket::DebugSocket(std::ostream &os, SocketId id) {
        SocketUniquePtr ptr;
        int ret = Socket::AddressFailedAsWell(id, &ptr);
        if (ret < 0) {
            os << "SocketId=" << id << " is invalid or recycled";
            return;
        } else if (ret > 0) {
            // NOTE: Printing a broken socket w/o HC is informational for
            // debugging referential issues.
            // if (ptr->_health_check_interval_s <= 0) {
            //     // Sockets without HC will soon be destroyed
            //     os << "Invalid SocketId=" << id;
            //     return;
            // }
            os << "# This is a broken Socket\n";
        }
        const uint64_t vref = ptr->versioned_ref();
        size_t npipelined = 0;
        size_t idsizes[4];
        size_t nidsize = 0;
        {
            KRPC_SCOPED_LOCK(ptr->_pipeline_mutex);
            if (ptr->_pipeline_q) {
                npipelined = ptr->_pipeline_q->size();
            }
        }
        {
            KRPC_SCOPED_LOCK(ptr->_id_wait_list_mutex);
            if (kthread::get_sizes) {
                nidsize = kthread::get_sizes(
                        &ptr->_id_wait_list, idsizes, arraysize(idsizes));
            }
        }
        const int preferred_index = ptr->preferred_index();
        SharedPart *sp = ptr->GetSharedPart();
        os << "version=" << VersionOfVRef(vref);
        if (sp) {
            os << "\nshared_part={\n  ref_count=" << sp->ref_count()
               << "\n  socket_pool=";
            SocketPool *pool = sp->socket_pool.load(kutil::memory_order_consume);
            if (pool) {
                os << '[';
                std::vector<SocketId> pooled_sockets;
                pool->ListSockets(&pooled_sockets, 0);
                for (size_t i = 0; i < pooled_sockets.size(); ++i) {
                    if (i) {
                        os << ' ';
                    }
                    os << pooled_sockets[i];
                }
                os << "]\n  numfree="
                   << pool->_numfree.load(kutil::memory_order_relaxed)
                   << "\n  numinflight="
                   << pool->_numinflight.load(kutil::memory_order_relaxed);
            } else {
                os << "null";
            }
            os << "\n  creator_socket=" << sp->creator_socket_id
               << "\n  in_size=" << sp->in_size.load(kutil::memory_order_relaxed)
               << "\n  in_num_messages=" << sp->in_num_messages.load(kutil::memory_order_relaxed)
               << "\n  out_size=" << sp->out_size.load(kutil::memory_order_relaxed)
               << "\n  out_num_messages=" << sp->out_num_messages.load(kutil::memory_order_relaxed)
               << "\n}";
        }
        const int fd = ptr->_fd.load(kutil::memory_order_relaxed);
        os << "\nnref=" << NRefOfVRef(vref) - 1
           //                                ^
           // minus the ref of current callsite(calling PrintSocket)
           << "\nnevent=" << ptr->_nevent.load(kutil::memory_order_relaxed)
           << "\nfd=" << fd
           << "\ntos=" << ptr->_tos
           << "\nreset_fd_to_now=" << kutil::gettimeofday_us() - ptr->_reset_fd_real_us << "us"
           << "\nremote_side=" << ptr->_remote_side
           << "\nlocal_side=" << ptr->_local_side
           << "\non_et_events=" << (void *) ptr->_on_edge_triggered_events
           << "\nuser=" << ShowObject(ptr->_user)
           << "\nthis_id=" << ptr->id()
           << "\npreferred_index=" << preferred_index;
        InputMessenger *messenger = dynamic_cast<InputMessenger *>(ptr->user());
        if (messenger != NULL) {
            os << " (" << messenger->NameOfProtocol(preferred_index) << ')';
        }
        const int64_t cpuwide_now = kutil::cpuwide_time_us();
        os << "\nhc_count=" << ptr->_hc_count
           << "\navg_input_msg_size=" << ptr->_avg_msg_size
           // NOTE: We're assuming that kutil::IOBuf.size() is thread-safe, it is now
           // however it's not guaranteed.
           << "\nread_buf=" << ptr->_read_buf.size()
           << "\nlast_read_to_now=" << cpuwide_now - ptr->_last_readtime_us << "us"
           << "\nlast_write_to_now=" << cpuwide_now - ptr->_last_writetime_us << "us"
           << "\novercrowded=" << ptr->_overcrowded;
        os << "\nid_wait_list={";
        for (size_t i = 0; i < nidsize; ++i) {
            if (i) {
                os << ' ';
            }
            os << idsizes[i];
        }
        os << '}';
        Destroyable *const parsing_context = ptr->parsing_context();
        Describable *parsing_context_desc = dynamic_cast<Describable *>(parsing_context);
        if (parsing_context_desc) {
            os << "\nparsing_context=" << kutil::class_name_str(*parsing_context) << '{';
            DescribeOptions opt;
            opt.verbose = true;
            IndentingOStream os2(os, 2);
            parsing_context_desc->Describe(os2, opt);
            os << '}';
        } else {
            os << "\nparsing_context=" << ShowObject(parsing_context);
        }
        const SSLState ssl_state = ptr->ssl_state();
        os << "\npipeline_q=" << npipelined
           << "\nhc_interval_s=" << ptr->_health_check_interval_s
           << "\nis_hc_related_ref_held=" << ptr->_is_hc_related_ref_held
           << "\nninprocess=" << ptr->_ninprocess.load(kutil::memory_order_relaxed)
           << "\nauth_flag_error=" << ptr->_auth_flag_error.load(kutil::memory_order_relaxed)
           << "\nauth_id=" << ptr->_auth_id.value
           << "\nauth_context=" << ptr->_auth_context
           << "\nlogoff_flag=" << ptr->_logoff_flag.load(kutil::memory_order_relaxed)
           << "\n_additional_ref_status="
           << ptr->additional_ref_status()
           << "\ntotal_streams_buffer_size="
           << ptr->_total_streams_unconsumed_size.load(kutil::memory_order_relaxed)
           << "\nninflight_app_health_check="
           << ptr->_ninflight_app_health_check.load(kutil::memory_order_relaxed)
           << "\nagent_socket_id=";
        const SocketId asid = ptr->_agent_socket_id.load(kutil::memory_order_relaxed);
        if (asid != INVALID_SOCKET_ID) {
            os << asid;
        } else {
            os << "(none)";
        }
        os << "\ncid=" << ptr->_correlation_id
           << "\nwrite_head=" << ptr->_write_head.load(kutil::memory_order_relaxed)
           << "\nssl_state=" << SSLStateToString(ssl_state);
        const SocketSSLContext *ssl_ctx = ptr->_ssl_ctx.get();
        if (ssl_ctx) {
            os << "\ninitial_ssl_ctx=" << ssl_ctx->raw_ctx;
            if (!ssl_ctx->sni_name.empty()) {
                os << "\nsni_name=" << ssl_ctx->sni_name;
            }
        }
        if (ssl_state == SSL_CONNECTED) {
            os << "\nssl_session={\n  ";
            Print(os, ptr->_ssl_session, "\n  ");
            os << "\n}";
        }

        os << "\nis_wirte_shutdown=" << ptr->_is_write_shutdown;

        {
            int keepalive = 0;
            socklen_t len = sizeof(keepalive);
            if (getsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, &len) == 0) {
                os << "\nkeepalive=" << keepalive;
            }
        }

        {
            int keepidle = 0;
            socklen_t len = sizeof(keepidle);
#if defined(OS_MACOSX)
            if (getsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE, &keepidle, &len) == 0) {
                os << "\ntcp_keepalive_time=" << keepidle;
            }
#elif defined(OS_LINUX)
            if (getsockopt(fd, SOL_TCP, TCP_KEEPIDLE, &keepidle, &len) == 0) {
                os << "\ntcp_keepalive_time=" << keepidle;
            }
#endif
        }

        {
            int keepintvl = 0;
            socklen_t len = sizeof(keepintvl);
#if defined(OS_MACOSX)
            if (getsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &keepintvl, &len) == 0) {
                os << "\ntcp_keepalive_intvl=" << keepintvl;
            }
#elif defined(OS_LINUX)
            if (getsockopt(fd, SOL_TCP, TCP_KEEPINTVL, &keepintvl, &len) == 0) {
                os << "\ntcp_keepalive_intvl=" << keepintvl;
            }
#endif
        }

        {
            int keepcnt = 0;
            socklen_t len = sizeof(keepcnt);
#if defined(OS_MACOSX)
            if (getsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &keepcnt, &len) == 0) {
                os << "\ntcp_keepalive_probes=" << keepcnt;
            }
#elif defined(OS_LINUX)
            if (getsockopt(fd, SOL_TCP, TCP_KEEPCNT, &keepcnt, &len) == 0) {
                os << "\ntcp_keepalive_probes=" << keepcnt;
            }
#endif
        }

#if defined(OS_MACOSX)
        struct tcp_connection_info ti;
        socklen_t len = sizeof(ti);
        if (fd >= 0 && getsockopt(fd, IPPROTO_TCP, TCP_CONNECTION_INFO, &ti, &len) == 0) {
            os << "\ntcpi={\n  state=" << (uint32_t)ti.tcpi_state
               << "\n  snd_wscale=" << (uint32_t)ti.tcpi_snd_wscale
               << "\n  rcv_wscale=" << (uint32_t)ti.tcpi_rcv_wscale
               << "\n  options=" << (uint32_t)ti.tcpi_options
               << "\n  flags=" << (uint32_t)ti.tcpi_flags
               << "\n  rto=" << ti.tcpi_rto
               << "\n  maxseg=" << ti.tcpi_maxseg
               << "\n  snd_ssthresh=" << ti.tcpi_snd_ssthresh
               << "\n  snd_cwnd=" << ti.tcpi_snd_cwnd
               << "\n  snd_wnd=" << ti.tcpi_snd_wnd
               << "\n  snd_sbbytes=" << ti.tcpi_snd_sbbytes
               << "\n  rcv_wnd=" << ti.tcpi_rcv_wnd
               << "\n  srtt=" << ti.tcpi_srtt
               << "\n  rttvar=" << ti.tcpi_rttvar
               << "\n}";
        }
#elif defined(OS_LINUX)
        struct tcp_info ti;
        socklen_t len = sizeof(ti);
        if (fd >= 0 && getsockopt(fd, SOL_TCP, TCP_INFO, &ti, &len) == 0) {
            os << "\ntcpi={\n  state=" << (uint32_t) ti.tcpi_state
               << "\n  ca_state=" << (uint32_t) ti.tcpi_ca_state
               << "\n  retransmits=" << (uint32_t) ti.tcpi_retransmits
               << "\n  probes=" << (uint32_t) ti.tcpi_probes
               << "\n  backoff=" << (uint32_t) ti.tcpi_backoff
               << "\n  options=" << (uint32_t) ti.tcpi_options
               << "\n  snd_wscale=" << (uint32_t) ti.tcpi_snd_wscale
               << "\n  rcv_wscale=" << (uint32_t) ti.tcpi_rcv_wscale
               << "\n  rto=" << ti.tcpi_rto
               << "\n  ato=" << ti.tcpi_ato
               << "\n  snd_mss=" << ti.tcpi_snd_mss
               << "\n  rcv_mss=" << ti.tcpi_rcv_mss
               << "\n  unacked=" << ti.tcpi_unacked
               << "\n  sacked=" << ti.tcpi_sacked
               << "\n  lost=" << ti.tcpi_lost
               << "\n  retrans=" << ti.tcpi_retrans
               << "\n  fackets=" << ti.tcpi_fackets
               << "\n  last_data_sent=" << ti.tcpi_last_data_sent
               << "\n  last_ack_sent=" << ti.tcpi_last_ack_sent
               << "\n  last_data_recv=" << ti.tcpi_last_data_recv
               << "\n  last_ack_recv=" << ti.tcpi_last_ack_recv
               << "\n  pmtu=" << ti.tcpi_pmtu
               << "\n  rcv_ssthresh=" << ti.tcpi_rcv_ssthresh
               << "\n  rtt=" << ti.tcpi_rtt  // smoothed
               << "\n  rttvar=" << ti.tcpi_rttvar
               << "\n  snd_ssthresh=" << ti.tcpi_snd_ssthresh
               << "\n  snd_cwnd=" << ti.tcpi_snd_cwnd
               << "\n  advmss=" << ti.tcpi_advmss
               << "\n  reordering=" << ti.tcpi_reordering
               << "\n}";
        }
#endif
#if KRPC_WITH_RDMA
        if (ptr->_rdma_state == RDMA_ON && ptr->_rdma_ep) {
            ptr->_rdma_ep->DebugInfo(os);
        }
#endif
        { os << "\nkthread_tag=" << ptr->_io_event.kthread_tag(); }
    }

    int Socket::CheckHealth() {
        if (_hc_count == 0) {
            LOG(INFO) << "Checking " << *this;
        }
        const timespec duetime =
                kutil::milliseconds_from_now(turbo::get_flag(FLAGS_health_check_timeout_ms));
        const int connected_fd = Connect(&duetime, NULL, NULL);
        if (connected_fd >= 0) {
            ::close(connected_fd);
            return 0;
        }
        return errno;
    }

    int Socket::AddStream(StreamId stream_id) {
        _stream_mutex.lock();
        if (Failed()) {
            _stream_mutex.unlock();
            return -1;
        }
        if (_stream_set == NULL) {
            _stream_set = new std::set<StreamId>();
        }
        _stream_set->insert(stream_id);
        _stream_mutex.unlock();
        return 0;
    }

    int Socket::RemoveStream(StreamId stream_id) {
        _stream_mutex.lock();
        if (_stream_set == NULL) {
            _stream_mutex.unlock();
            CHECK(false) << "AddStream was not called";
            return -1;
        }
        _stream_set->erase(stream_id);
        _stream_mutex.unlock();
        return 0;
    }

    void Socket::ResetAllStreams(int error_code, const std::string &error_text) {
        DCHECK(Failed());
        std::set<StreamId> saved_stream_set;
        _stream_mutex.lock();
        if (_stream_set != NULL) {
            // Not delete _stream_set because there are likely more streams added
            // after reviving if the Socket is still in use, or it is to be deleted in
            // BeforeRecycled()
            saved_stream_set.swap(*_stream_set);
        }
        _stream_mutex.unlock();
        for (auto it = saved_stream_set.begin();
             it != saved_stream_set.end(); ++it) {
            Stream::SetFailed(*it, error_code, "%s", error_text.c_str());
        }
    }

    int SocketUser::CheckHealth(Socket *ptr) {
        return ptr->CheckHealth();
    }

    void SocketUser::AfterRevived(Socket *ptr) {
        LOG(INFO) << "Revived " << *ptr << " (Connectable)";
    }

////////// SocketPool //////////////

    inline SocketPool::SocketPool(const SocketOptions &opt)
            : _options(opt), _remote_side(opt.remote_side), _numfree(0), _numinflight(0) {
    }

    inline SocketPool::~SocketPool() {
        for (std::vector<SocketId>::iterator it = _pool.begin();
             it != _pool.end(); ++it) {
            SocketUniquePtr ptr;
            if (Socket::Address(*it, &ptr) == 0) {
                ptr->ReleaseAdditionalReference();
            }
        }
    }

    inline int SocketPool::GetSocket(SocketUniquePtr *ptr) {
        const int connection_pool_size = turbo::get_flag(FLAGS_max_connection_pool_size);

        // In prev rev, SocketPool could be sharded into multiple SubSocketPools to
        // reduce thread contentions. The sharding key is mixed from pthread-id so
        // that data locality are better kept.
        // However sharding also makes the socket more frequently to be created
        // and closed, especially in real-world applications that one client
        // connects to many servers where one socket is lowly contended, different
        // threads accessing the socket may create pooled sockets in different sub
        // pools without reusing sockets left in other sub pools, which will
        // probably be closed by the CloseIdleConnections thread in socket_map.cpp,
        // resulting in frequent-create-and-close of connections.
        // Thus the sharding is merely a mechanism only meaningful in benchmarking
        // scenarios where one server is connected by one client with many threads.
        // Starting from r32203 the sharding capability is removed.

        SocketId sid = 0;
        if (connection_pool_size > 0) {
            for (;;) {
                {
                    KRPC_SCOPED_LOCK(_mutex);
                    if (_pool.empty()) {
                        break;
                    }
                    sid = _pool.back();
                    _pool.pop_back();
                }
                _numfree.fetch_sub(1, kutil::memory_order_relaxed);
                // Not address inside the lock since at most time the pooled socket
                // is likely to be valid.
                if (Socket::Address(sid, ptr) == 0) {
                    _numinflight.fetch_add(1, kutil::memory_order_relaxed);
                    return 0;
                }
            }
        }
        // Not found in pool
        SocketOptions opt = _options;
        opt.health_check_interval_s = -1;
        if (get_client_side_messenger()->Create(opt, &sid) == 0 &&
            Socket::Address(sid, ptr) == 0) {
            _numinflight.fetch_add(1, kutil::memory_order_relaxed);
            return 0;
        }
        return -1;
    }

    inline void SocketPool::ReturnSocket(Socket *sock) {
        // NOTE: save the gflag which may be reloaded at any time.
        const int connection_pool_size = turbo::get_flag(FLAGS_max_connection_pool_size);

        // Check if the pool is full.
        if (_numfree.fetch_add(1, kutil::memory_order_relaxed) <
            connection_pool_size) {
            const SocketId sid = sock->id();
            KRPC_SCOPED_LOCK(_mutex);
            _pool.push_back(sid);
        } else {
            // Cancel the addition and close the pooled socket.
            _numfree.fetch_sub(1, kutil::memory_order_relaxed);
            sock->SetFailed(EUNUSED, "Close unused pooled socket");
        }
        _numinflight.fetch_sub(1, kutil::memory_order_relaxed);
    }

    inline void SocketPool::ListSockets(std::vector<SocketId> *out, size_t max_count) {
        out->clear();
        // NOTE: size() of vector is thread-unsafe and may return a very
        // large value during resizing.
        _mutex.lock();
        size_t expected_size = _pool.size();
        if (max_count > 0 && max_count < _pool.size()) {
            expected_size = max_count;
        }
        if (out->capacity() < expected_size) {
            _mutex.unlock();
            out->reserve(expected_size + 4); // pool may add sockets.
            _mutex.lock();
        }
        if (max_count == 0) {
            out->insert(out->end(), _pool.begin(), _pool.end());
        } else {
            for (size_t i = 0; i < expected_size; ++i) {
                out->push_back(_pool[i]);
            }
        }
        _mutex.unlock();
    }

    Socket::SharedPart *Socket::GetOrNewSharedPartSlower() {
        // Create _shared_part optimistically.
        SharedPart *shared_part = GetSharedPart();
        if (shared_part == NULL) {
            shared_part = new SharedPart(id());
            shared_part->AddRefManually();
            SharedPart *expected = NULL;
            if (!_shared_part.compare_exchange_strong(
                    expected, shared_part, kutil::memory_order_acq_rel)) {
                shared_part->RemoveRefManually();
                CHECK(expected);
                shared_part = expected;
            }
        }
        return shared_part;
    }

    void Socket::ShareStats(Socket *main_socket) {
        SharedPart *main_sp = main_socket->GetOrNewSharedPart();
        main_sp->AddRefManually();
        SharedPart *my_sp =
                _shared_part.exchange(main_sp, kutil::memory_order_acq_rel);
        if (my_sp) {
            my_sp->RemoveRefManually();
        }
    }

    int Socket::GetPooledSocket(SocketUniquePtr *pooled_socket) {
        if (pooled_socket == NULL) {
            LOG(ERROR) << "pooled_socket is NULL";
            return -1;
        }
        SharedPart *main_sp = GetOrNewSharedPart();
        if (main_sp == NULL) {
            LOG(ERROR) << "_shared_part is NULL";
            return -1;
        }
        // Create socket_pool optimistically.
        SocketPool *socket_pool = main_sp->socket_pool.load(kutil::memory_order_consume);
        if (socket_pool == NULL) {
            SocketOptions opt;
            opt.remote_side = remote_side();
            opt.user = user();
            opt.on_edge_triggered_events = _on_edge_triggered_events;
            opt.initial_ssl_ctx = _ssl_ctx;
            opt.keytable_pool = _keytable_pool;
            opt.app_connect = _app_connect;
            opt.use_rdma = (_rdma_ep) ? true : false;
            socket_pool = new SocketPool(opt);
            SocketPool *expected = NULL;
            if (!main_sp->socket_pool.compare_exchange_strong(
                    expected, socket_pool, kutil::memory_order_acq_rel)) {
                delete socket_pool;
                CHECK(expected);
                socket_pool = expected;
            }
        }
        if (socket_pool->GetSocket(pooled_socket) != 0) {
            return -1;
        }
        (*pooled_socket)->ShareStats(this);
        CHECK((*pooled_socket)->parsing_context() == NULL)
                        << "context=" << (*pooled_socket)->parsing_context()
                        << " is not NULL when " << *(*pooled_socket) << " is got from"
                                                                        " SocketPool, the protocol implementation is buggy";
        return 0;
    }

    int Socket::ReturnToPool() {
        SharedPart *sp = _shared_part.exchange(NULL, kutil::memory_order_acquire);
        if (sp == NULL) {
            LOG(ERROR) << "_shared_part is NULL";
            SetFailed(EINVAL, "_shared_part is NULL");
            return -1;
        }
        SocketPool *pool = sp->socket_pool.load(kutil::memory_order_consume);
        if (pool == NULL) {
            LOG(ERROR) << "_shared_part->socket_pool is NULL";
            SetFailed(EINVAL, "_shared_part->socket_pool is NULL");
            sp->RemoveRefManually();
            return -1;
        }
        CHECK(parsing_context() == NULL)
                        << "context=" << parsing_context() << " is not released when "
                        << *this << " is returned to SocketPool, the protocol "
                                    "implementation is buggy";
        // NOTE: be careful with the sequence.
        // - related fields must be reset before returning to pool
        // - sp must be released after returning to pool because it owns pool
        _connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN;
        _controller_released_socket.store(false, kutil::memory_order_relaxed);
        // Reset the write timestamp to make the returned connection live (longer)
        // This is useful for using a fake Socket + SocketConnection impl. to integrate
        // 3rd-party client into kRPC (like MySQL Client).
        _last_writetime_us.store(kutil::cpuwide_time_us(), kutil::memory_order_relaxed);
        pool->ReturnSocket(this);
        sp->RemoveRefManually();
        return 0;
    }

    bool Socket::HasSocketPool() const {
        SharedPart *sp = GetSharedPart();
        if (sp != NULL) {
            return sp->socket_pool.load(kutil::memory_order_consume) != NULL;
        }
        return false;
    }

    void Socket::ListPooledSockets(std::vector<SocketId> *out, size_t max_count) {
        out->clear();
        SharedPart *sp = GetSharedPart();
        if (sp == NULL) {
            return;
        }
        SocketPool *pool = sp->socket_pool.load(kutil::memory_order_consume);
        if (pool == NULL) {
            return;
        }
        pool->ListSockets(out, max_count);
    }

    bool Socket::GetPooledSocketStats(int *numfree, int *numinflight) {
        SharedPart *sp = GetSharedPart();
        if (sp == NULL) {
            return false;
        }
        SocketPool *pool = sp->socket_pool.load(kutil::memory_order_consume);
        if (pool == NULL) {
            return false;
        }
        *numfree = pool->_numfree.load(kutil::memory_order_relaxed);
        *numinflight = pool->_numinflight.load(kutil::memory_order_relaxed);
        return true;
    }

    int Socket::GetShortSocket(SocketUniquePtr *short_socket) {
        if (short_socket == NULL) {
            LOG(ERROR) << "short_socket is NULL";
            return -1;
        }
        SocketId id;
        SocketOptions opt;
        opt.remote_side = remote_side();
        opt.user = user();
        opt.on_edge_triggered_events = _on_edge_triggered_events;
        opt.initial_ssl_ctx = _ssl_ctx;
        opt.keytable_pool = _keytable_pool;
        opt.app_connect = _app_connect;
        opt.use_rdma = (_rdma_ep) ? true : false;
        if (get_client_side_messenger()->Create(opt, &id) != 0 ||
            Socket::Address(id, short_socket) != 0) {
            return -1;
        }
        (*short_socket)->ShareStats(this);
        return 0;
    }

    int Socket::GetAgentSocket(SocketUniquePtr *out, bool (*checkfn)(Socket *)) {
        SocketId id = _agent_socket_id.load(kutil::memory_order_relaxed);
        SocketUniquePtr tmp_sock;
        do {
            if (Socket::Address(id, &tmp_sock) == 0) {
                if (checkfn == NULL || checkfn(tmp_sock.get())) {
                    out->swap(tmp_sock);
                    return 0;
                }
                tmp_sock->ReleaseAdditionalReference();
            }
            do {
                if (GetShortSocket(&tmp_sock) != 0) {
                    LOG(ERROR) << "Fail to get short socket from " << *this;
                    return -1;
                }
                if (checkfn == NULL || checkfn(tmp_sock.get())) {
                    break;
                }
                tmp_sock->ReleaseAdditionalReference();
            } while (1);

            if (_agent_socket_id.compare_exchange_strong(
                    id, tmp_sock->id(), kutil::memory_order_acq_rel)) {
                out->swap(tmp_sock);
                return 0;
            }
            tmp_sock->ReleaseAdditionalReference();
            // id was updated, re-address
        } while (1);
    }

    int Socket::PeekAgentSocket(SocketUniquePtr *out) const {
        SocketId id = _agent_socket_id.load(kutil::memory_order_relaxed);
        if (id == INVALID_SOCKET_ID) {
            return -1;
        }
        return Address(id, out);
    }

    void Socket::GetStat(SocketStat *s) const {
        BAIDU_CASSERT(offsetof(Socket, _preferred_index) >= 64, different_cacheline);
        BAIDU_CASSERT(sizeof(WriteRequest) == 64, sizeof_write_request_is_64);

        SharedPart *sp = GetSharedPart();
        if (sp != NULL && sp->extended_stat != NULL) {
            *s = *sp->extended_stat;
        } else {
            memset(s, 0, sizeof(*s));
        }
    }

    void Socket::AddInputBytes(size_t bytes) {
        GetOrNewSharedPart()->in_size.fetch_add(bytes, kutil::memory_order_relaxed);
    }

    void Socket::AddInputMessages(size_t count) {
        GetOrNewSharedPart()->in_num_messages.fetch_add(count, kutil::memory_order_relaxed);
    }

    void Socket::CancelUnwrittenBytes(size_t bytes) {
        const int64_t before_minus =
                _unwritten_bytes.fetch_sub(bytes, kutil::memory_order_relaxed);
        if (before_minus < (int64_t) bytes + turbo::get_flag(FLAGS_socket_max_unwritten_bytes)) {
            _overcrowded = false;
        }
    }

    void Socket::AddOutputBytes(size_t bytes) {
        GetOrNewSharedPart()->out_size.fetch_add(bytes, kutil::memory_order_relaxed);
        _last_writetime_us.store(kutil::cpuwide_time_us(),
                                 kutil::memory_order_relaxed);
        CancelUnwrittenBytes(bytes);
    }

    void Socket::AddOutputMessages(size_t count) {
        GetOrNewSharedPart()->out_num_messages.fetch_add(count, kutil::memory_order_relaxed);
    }

    SocketId Socket::main_socket_id() const {
        SharedPart *sp = GetSharedPart();
        if (sp) {
            return sp->creator_socket_id;
        }
        return INVALID_SOCKET_ID;
    }

    void Socket::OnProgressiveReadCompleted() {
        if (is_read_progressive() &&
            (_controller_released_socket.load(kutil::memory_order_relaxed) ||
             _controller_released_socket.exchange(
                     true, kutil::memory_order_relaxed))) {
            if (_connection_type_for_progressive_read == CONNECTION_TYPE_POOLED) {
                ReturnToPool();
            } else if (_connection_type_for_progressive_read == CONNECTION_TYPE_SHORT) {
                SetFailed(EUNUSED, "[%s]Close short connection", __FUNCTION__);
            }
        }
    }

    SocketSSLContext::SocketSSLContext()
            : raw_ctx(NULL) {}

    SocketSSLContext::~SocketSSLContext() {
        if (raw_ctx) {
            SSL_CTX_free(raw_ctx);
        }
    }

} // namespace krpc


namespace std {
    ostream &operator<<(ostream &os, const krpc::Socket &sock) {
        // NOTE: The output should be consistent with Socket::description()
        os << "Socket{id=" << sock.id();
        const int fd = sock.fd();
        if (fd >= 0) {
            os << " fd=" << fd;
        }
        os << " addr=" << sock.remote_side();
        const int local_port = sock.local_side().port;
        if (local_port > 0) {
            os << ':' << local_port;
        }
        os << "} (" << (void *) &sock << ')';
        return os;
    }
}
